Addendum:

I think we would also face the same problem with the approach John outlined
earlier (using the record cache as a transaction buffer and flushing it
straight to SST files). This is because the record cache (the ThreadCache
class) is not thread-safe, so every commit would invalidate open IQ
Iterators in the same way that RocksDB WriteBatches do.
--
Nick

On Wed, 13 Sept 2023 at 16:58, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Bruno,
>
> I've updated the KIP based on our conversation. The only things I've not
> yet done are:
>
> 1. Using transactions under ALOS and EOS.
> 2. Making IsolationLevel a query-time constraint, rather than linking it
> to the processing.guarantee.
>
> There's a wrinkle that makes this a challenge: Interactive Queries that
> open an Iterator, when using transactions and READ_UNCOMMITTED.
> The problem is that under READ_UNCOMMITTED, queries need to be able to
> read records from the currently uncommitted transaction buffer
> (WriteBatch). This includes for Iterators, which should iterate both the
> transaction buffer and underlying database (using
> WriteBatch#iteratorWithBase()).
>
> The issue is that when the StreamThread commits, it writes the current
> WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the
> WriteBatch while an Interactive Query holds an open Iterator on it will
> invalidate the Iterator. Worse, it turns out that Iterators over a
> WriteBatch become invalidated not just when the WriteBatch is cleared, but
> also when the Iterators' current key receives a new write.
>
> Now that I'm writing this, I remember that this is the major reason that I
> switched the original design from having a query-time IsolationLevel to
> having the IsolationLevel linked to the transactionality of the stores
> themselves.
>
> It *might* be possible to resolve this, by having a "chain" of
> WriteBatches, with the StreamThread switching to a new WriteBatch whenever
> a new Interactive Query attempts to read from the database, but that could
> cause some performance problems/memory pressure when subjected to a high
> Interactive Query load. It would also reduce the efficiency of WriteBatches
> on-commit, as we'd have to write N WriteBatches, where N is the number of
> Interactive Queries since the last commit.
>
> I realise this is getting into the weeds of the implementation, and you'd
> rather we focus on the API for now, but I think it's important to consider
> how to implement the desired API, in case we come up with an API that
> cannot be implemented efficiently, or even at all!
>
> Thoughts?
> --
> Nick
>
> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <cado...@apache.org> wrote:
>
>> Hi Nick,
>>
>> 6.
>> Of course, you are right! My bad!
>> Wiping out the state in the downgrading case is fine.
>>
>>
>> 3a.
>> Focus on the public facing changes for the KIP. We will manage to get
>> the internals right. Regarding state stores that do not support
>> READ_COMMITTED, they should throw an error stating that they do not
>> support READ_COMMITTED. No need to adapt all state stores immediately.
>>
>> 3b.
>> I am in favor of using transactions also for ALOS.
>>
>>
>> Best,
>> Bruno
>>
>> On 9/13/23 11:57 AM, Nick Telford wrote:
>> > Hi Bruno,
>> >
>> > Thanks for getting back to me!
>> >
>> > 2.
>> > The fact that implementations can always track estimated memory usage in
>> > the wrapper is a good point. I can remove -1 as an option, and I'll
>> clarify
>> > the JavaDoc that 0 is not just for non-transactional stores, which is
>> > currently misleading.
>> >
>> > 6.
>> > The problem with catching the exception in the downgrade process is that
>> > would require new code in the Kafka version being downgraded to. Since
>> > users could conceivably downgrade to almost *any* older version of Kafka
>> > Streams, I'm not sure how we could add that code?
>> > The only way I can think of doing it would be to provide a dedicated
>> > downgrade tool, that goes through every local store and removes the
>> > offsets column families. But that seems like an unnecessary amount of
>> extra
>> > code to maintain just to handle a somewhat niche situation, when the
>> > alternative (automatically wipe and restore stores) should be
>> acceptable.
>> >
>> > 1, 4, 5: Agreed. I'll make the changes you've requested.
>> >
>> > 3a.
>> > I agree that IsolationLevel makes more sense at query-time, and I
>> actually
>> > initially attempted to place the IsolationLevel at query-time, but I ran
>> > into some problems:
>> > - The key issue is that, under ALOS we're not staging writes in
>> > transactions, so can't perform writes at the READ_COMMITTED isolation
>> > level. However, this may be addressed if we decide to *always* use
>> > transactions as discussed under 3b.
>> > - IQv1 and IQv2 have quite different implementations. I remember having
>> > some difficulty understanding the IQv1 internals, which made it
>> difficult
>> > to determine what needed to be changed. However, I *think* this can be
>> > addressed for both implementations by wrapping the RocksDBStore in an
>> > IsolationLevel-dependent wrapper, that overrides read methods (get,
>> etc.)
>> > to either read directly from the database or from the ongoing
>> transaction.
>> > But IQv1 might still be difficult.
>> > - If IsolationLevel becomes a query constraint, then all other
>> StateStores
>> > will need to respect it, including the in-memory stores. This would
>> require
>> > us to adapt in-memory stores to stage their writes so they can be
>> isolated
>> > from READ_COMMITTTED queries. It would also become an important
>> > consideration for third-party stores on upgrade, as without changes,
>> they
>> > would not support READ_COMMITTED queries correctly.
>> >
>> > Ultimately, I may need some help making the necessary change to IQv1 to
>> > support this, but I don't think it's fundamentally impossible, if we
>> want
>> > to pursue this route.
>> >
>> > 3b.
>> > The main reason I chose to keep ALOS un-transactional was to minimize
>> > behavioural change for most users (I believe most Streams users use the
>> > default configuration, which is ALOS). That said, it's clear that if
>> ALOS
>> > also used transactional stores, the only change in behaviour would be
>> that
>> > it would become *more correct*, which could be considered a "bug fix" by
>> > users, rather than a change they need to handle.
>> >
>> > I believe that performance using transactions (aka. RocksDB
>> WriteBatches)
>> > should actually be *better* than the un-batched write-path that is
>> > currently used[1]. The only "performance" consideration will be the
>> > increased memory usage that transactions require. Given the mitigations
>> for
>> > this memory that we have in place, I would expect that this is not a
>> > problem for most users.
>> >
>> > If we're happy to do so, we can make ALOS also use transactions.
>> >
>> > Regards,
>> > Nick
>> >
>> > Link 1:
>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> >
>> > On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org>
>> wrote:
>> >
>> >> Hi Nick,
>> >>
>> >> Thanks for the updates and sorry for the delay on my side!
>> >>
>> >>
>> >> 1.
>> >> Making the default implementation for flush() a no-op sounds good to
>> me.
>> >>
>> >>
>> >> 2.
>> >> I think what was bugging me here is that a third-party state store
>> needs
>> >> to implement the state store interface. That means they need to
>> >> implement a wrapper around the actual state store as we do for RocksDB
>> >> with RocksDBStore. So, a third-party state store can always estimate
>> the
>> >> uncommitted bytes, if it wants, because the wrapper can record the
>> added
>> >> bytes.
>> >> One case I can think of where returning -1 makes sense is when Streams
>> >> does not need to estimate the size of the write batch and trigger
>> >> extraordinary commits, because the third-party state store takes care
>> of
>> >> memory. But in that case the method could also just return 0. Even that
>> >> case would be better solved with a method that returns whether the
>> state
>> >> store manages itself the memory used for uncommitted bytes or not.
>> >> Said that, I am fine with keeping the -1 return value, I was just
>> >> wondering when and if it will be used.
>> >>
>> >> Regarding returning 0 for transactional state stores when the batch is
>> >> empty, I was just wondering because you explicitly stated
>> >>
>> >> "or {@code 0} if this StateStore does not support transactions."
>> >>
>> >> So it seemed to me returning 0 could only happen for non-transactional
>> >> state stores.
>> >>
>> >>
>> >> 3.
>> >>
>> >> a) What do you think if we move the isolation level to IQ (v1 and v2)?
>> >> In the end this is the only component that really needs to specify the
>> >> isolation level. It is similar to the Kafka consumer that can choose
>> >> with what isolation level to read the input topic.
>> >> For IQv1 the isolation level should go into StoreQueryParameters. For
>> >> IQv2, I would add it to the Query interface.
>> >>
>> >> b) Point a) raises the question what should happen during at-least-once
>> >> processing when the state store does not use transactions? John in the
>> >> past proposed to also use transactions on state stores for
>> >> at-least-once. I like that idea, because it avoids aggregating the same
>> >> records over and over again in the case of a failure. We had a case in
>> >> the past where a Streams applications in at-least-once mode was failing
>> >> continuously for some reasons I do not remember before committing the
>> >> offsets. After each failover, the app aggregated again and again the
>> >> same records. Of course the aggregate increased to very wrong values
>> >> just because of the failover. With transactions on the state stores we
>> >> could have avoided this. The app would have output the same aggregate
>> >> multiple times (i.e., after each failover) but at least the value of
>> the
>> >> aggregate would not depend on the number of failovers. Outputting the
>> >> same aggregate multiple times would be incorrect under exactly-once but
>> >> it is OK for at-least-once.
>> >> If it makes sense to add a config to turn on and off transactions on
>> >> state stores under at-least-once or just use transactions in any case
>> is
>> >> a question we should also discuss in this KIP. It depends a bit on the
>> >> performance trade-off. Maybe to be safe, I would add a config.
>> >>
>> >>
>> >> 4.
>> >> Your points are all valid. I tend to say to keep the metrics around
>> >> flush() until we remove flush() completely from the interface. Calls to
>> >> flush() might still exist since existing processors might still call
>> >> flush() explicitly as you mentioned in 1). For sure, we need to
>> document
>> >> how the metrics change due to the transactions in the upgrade notes.
>> >>
>> >>
>> >> 5.
>> >> I see. Then you should describe how the .position files are handled  in
>> >> a dedicated section of the KIP or incorporate the description in the
>> >> "Atomic Checkpointing" section instead of only mentioning it in the
>> >> "Compatibility, Deprecation, and Migration Plan".
>> >>
>> >>
>> >> 6.
>> >> Describing upgrading and downgrading in the KIP is a good idea.
>> >> Regarding downgrading, I think you could also catch the exception and
>> do
>> >> what is needed to downgrade, e.g., drop the column family. See here for
>> >> an example:
>> >>
>> >>
>> >>
>> https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
>> >>
>> >> It is a bit brittle, but it works.
>> >>
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >>
>> >> On 8/24/23 12:18 PM, Nick Telford wrote:
>> >>> Hi Bruno,
>> >>>
>> >>> Thanks for taking the time to review the KIP. I'm back from leave now
>> and
>> >>> intend to move this forwards as quickly as I can.
>> >>>
>> >>> Addressing your points:
>> >>>
>> >>> 1.
>> >>> Because flush() is part of the StateStore API, it's exposed to custom
>> >>> Processors, which might be making calls to flush(). This was actually
>> the
>> >>> case in a few integration tests.
>> >>> To maintain as much compatibility as possible, I'd prefer not to make
>> >> this
>> >>> an UnsupportedOperationException, as it will cause previously working
>> >>> Processors to start throwing exceptions at runtime.
>> >>> I agree that it doesn't make sense for it to proxy commit(), though,
>> as
>> >>> that would cause it to violate the "StateStores commit only when the
>> Task
>> >>> commits" rule.
>> >>> Instead, I think we should make this a no-op. That way, existing user
>> >>> Processors will continue to work as-before, without violation of store
>> >>> consistency that would be caused by premature flush/commit of
>> StateStore
>> >>> data to disk.
>> >>> What do you think?
>> >>>
>> >>> 2.
>> >>> As stated in the JavaDoc, when a StateStore implementation is
>> >>> transactional, but is unable to estimate the uncommitted memory usage,
>> >> the
>> >>> method will return -1.
>> >>> The intention here is to permit third-party implementations that may
>> not
>> >> be
>> >>> able to estimate memory usage.
>> >>>
>> >>> Yes, it will be 0 when nothing has been written to the store yet. I
>> >> thought
>> >>> that was implied by "This method will return an approximation of the
>> >> memory
>> >>> would be freed by the next call to {@link #commit(Map)}" and "@return
>> The
>> >>> approximate size of all records awaiting {@link #commit(Map)}",
>> however,
>> >> I
>> >>> can add it explicitly to the JavaDoc if you think this is unclear?
>> >>>
>> >>> 3.
>> >>> I realise this is probably the most contentious point in my design,
>> and
>> >> I'm
>> >>> open to changing it if I'm unable to convince you of the benefits.
>> >>> Nevertheless, here's my argument:
>> >>> The Interactive Query (IQ) API(s) are directly provided StateStores to
>> >>> query, and it may be important for users to programmatically know
>> which
>> >>> mode the StateStore is operating under. If we simply provide an
>> >>> "eosEnabled" boolean (as used throughout the internal streams
>> engine), or
>> >>> similar, then users will need to understand the operation and
>> >> consequences
>> >>> of each available processing mode and how it pertains to their
>> >> StateStore.
>> >>>
>> >>> Interactive Query users aren't the only people that care about the
>> >>> processing.mode/IsolationLevel of a StateStore: implementers of custom
>> >>> StateStores also need to understand the behaviour expected of their
>> >>> implementation. KIP-892 introduces some assumptions into the Streams
>> >> Engine
>> >>> about how StateStores operate under each processing mode, and it's
>> >>> important that custom implementations adhere to those assumptions in
>> >> order
>> >>> to maintain the consistency guarantees.
>> >>>
>> >>> IsolationLevels provide a high-level contract on the behaviour of the
>> >>> StateStore: a user knows that under READ_COMMITTED, they will see
>> writes
>> >>> only after the Task has committed, and under READ_UNCOMMITTED they
>> will
>> >> see
>> >>> writes immediately. No understanding of the details of each
>> >> processing.mode
>> >>> is required, either for IQ users or StateStore implementers.
>> >>>
>> >>> An argument can be made that these contractual guarantees can simply
>> be
>> >>> documented for the processing.mode (i.e. that exactly-once and
>> >>> exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves
>> like
>> >>> READ_UNCOMMITTED), but there are several small issues with this I'd
>> >> prefer
>> >>> to avoid:
>> >>>
>> >>>      - Where would we document these contracts, in a way that is
>> difficult
>> >>>      for users/implementers to miss/ignore?
>> >>>      - It's not clear to users that the processing mode is
>> communicating
>> >>>      an expectation of read isolation, unless they read the
>> >> documentation. Users
>> >>>      rarely consult documentation unless they feel they need to, so
>> it's
>> >> likely
>> >>>      this detail would get missed by many users.
>> >>>      - It tightly couples processing modes to read isolation. Adding
>> new
>> >>>      processing modes, or changing the read isolation of existing
>> >> processing
>> >>>      modes would be difficult/impossible.
>> >>>
>> >>> Ultimately, the cost of introducing IsolationLevels is just a single
>> >>> method, since we re-use the existing IsolationLevel enum from Kafka.
>> This
>> >>> gives us a clear place to document the contractual guarantees expected
>> >>> of/provided by StateStores, that is accessible both by the StateStore
>> >>> itself, and by IQ users.
>> >>>
>> >>> (Writing this I've just realised that the StateStore and IQ APIs
>> actually
>> >>> don't provide access to StateStoreContext that IQ users would have
>> direct
>> >>> access to... Perhaps StateStore should expose isolationLevel() itself
>> >> too?)
>> >>>
>> >>> 4.
>> >>> Yeah, I'm not comfortable renaming the metrics in-place either, as
>> it's a
>> >>> backwards incompatible change. My concern is that, if we leave the
>> >> existing
>> >>> "flush" metrics in place, they will be confusing to users. Right now,
>> >>> "flush" metrics record explicit flushes to disk, but under KIP-892,
>> even
>> >> a
>> >>> commit() will not explicitly flush data to disk - RocksDB will decide
>> on
>> >>> when to flush memtables to disk itself.
>> >>>
>> >>> If we keep the existing "flush" metrics, we'd have two options, which
>> >> both
>> >>> seem pretty bad to me:
>> >>>
>> >>>      1. Have them record calls to commit(), which would be
>> misleading, as
>> >>>      data is no longer explicitly "flushed" to disk by this call.
>> >>>      2. Have them record nothing at all, which is equivalent to
>> removing
>> >> the
>> >>>      metrics, except that users will see the metric still exists and
>> so
>> >> assume
>> >>>      that the metric is correct, and that there's a problem with their
>> >> system
>> >>>      when there isn't.
>> >>>
>> >>> I agree that removing them is also a bad solution, and I'd like some
>> >>> guidance on the best path forward here.
>> >>>
>> >>> 5.
>> >>> Position files are updated on every write to a StateStore. Since our
>> >> writes
>> >>> are now buffered until commit(), we can't update the Position file
>> until
>> >>> commit() has been called, otherwise it would be inconsistent with the
>> >> data
>> >>> in the event of a rollback. Consequently, we need to manage these
>> offsets
>> >>> the same way we manage the checkpoint offsets, and ensure they're only
>> >>> written on commit().
>> >>>
>> >>> 6.
>> >>> Agreed, although I'm not exactly sure yet what tests to write. How
>> >> explicit
>> >>> do we need to be here in the KIP?
>> >>>
>> >>> As for upgrade/downgrade: upgrade is designed to be seamless, and we
>> >> should
>> >>> definitely add some tests around that. Downgrade, it transpires, isn't
>> >>> currently possible, as the extra column family for offset storage is
>> >>> incompatible with the pre-KIP-892 implementation: when you open a
>> RocksDB
>> >>> database, you must open all available column families or receive an
>> >> error.
>> >>> What currently happens on downgrade is that it attempts to open the
>> >> store,
>> >>> throws an error about the offsets column family not being opened,
>> which
>> >>> triggers a wipe and rebuild of the Task. Given that downgrades should
>> be
>> >>> uncommon, I think this is acceptable behaviour, as the end-state is
>> >>> consistent, even if it results in an undesirable state restore.
>> >>>
>> >>> Should I document the upgrade/downgrade behaviour explicitly in the
>> KIP?
>> >>>
>> >>> --
>> >>>
>> >>> Regards,
>> >>> Nick
>> >>>
>> >>>
>> >>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <cado...@apache.org>
>> wrote:
>> >>>
>> >>>> Hi Nick!
>> >>>>
>> >>>> Thanks for the updates!
>> >>>>
>> >>>> 1.
>> >>>> Why does StateStore#flush() default to
>> >>>> StateStore#commit(Collections.emptyMap())?
>> >>>> Since calls to flush() will not exist anymore after this KIP is
>> >>>> released, I would rather throw an unsupported operation exception by
>> >>>> default.
>> >>>>
>> >>>>
>> >>>> 2.
>> >>>> When would a state store return -1 from
>> >>>> StateStore#approximateNumUncommittedBytes() while being
>> transactional?
>> >>>>
>> >>>> Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
>> >>>> the state store is transactional but nothing has been written to the
>> >>>> state store yet?
>> >>>>
>> >>>>
>> >>>> 3.
>> >>>> Sorry for bringing this up again. Does this KIP really need to
>> introduce
>> >>>> StateStoreContext#isolationLevel()? StateStoreContext has already
>> >>>> appConfigs() which basically exposes the same information, i.e., if
>> EOS
>> >>>> is enabled or not.
>> >>>> In one of your previous e-mails you wrote:
>> >>>>
>> >>>> "My idea was to try to keep the StateStore interface as loosely
>> coupled
>> >>>> from the Streams engine as possible, to give implementers more
>> freedom,
>> >>>> and reduce the amount of internal knowledge required."
>> >>>>
>> >>>> While I understand the intent, I doubt that it decreases the
>> coupling of
>> >>>> a StateStore interface and the Streams engine. READ_COMMITTED only
>> >>>> applies to IQ but not to reads by processors. Thus, implementers
>> need to
>> >>>> understand how Streams accesses the state stores.
>> >>>>
>> >>>> I would like to hear what others think about this.
>> >>>>
>> >>>>
>> >>>> 4.
>> >>>> Great exposing new metrics for transactional state stores! However, I
>> >>>> would prefer to add new metrics and deprecate (in the docs) the old
>> >>>> ones. You can find examples of deprecated metrics here:
>> >>>> https://kafka.apache.org/documentation/#selector_monitoring
>> >>>>
>> >>>>
>> >>>> 5.
>> >>>> Why does the KIP mention position files? I do not think they are
>> related
>> >>>> to transactions or flushes.
>> >>>>
>> >>>>
>> >>>> 6.
>> >>>> I think we will also need to adapt/add integration tests besides unit
>> >>>> tests. Additionally, we probably need integration or system tests to
>> >>>> verify that upgrades and downgrades between transactional and
>> >>>> non-transactional state stores work as expected.
>> >>>>
>> >>>>
>> >>>> Best,
>> >>>> Bruno
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> On 7/21/23 10:34 PM, Nick Telford wrote:
>> >>>>> One more thing: I noted John's suggestion in the KIP, under
>> "Rejected
>> >>>>> Alternatives". I still think it's an idea worth pursuing, but I
>> believe
>> >>>>> that it's out of the scope of this KIP, because it solves a
>> different
>> >> set
>> >>>>> of problems to this KIP, and the scope of this one has already grown
>> >>>> quite
>> >>>>> large!
>> >>>>>
>> >>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Hi everyone,
>> >>>>>>
>> >>>>>> I've updated the KIP (
>> >>>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> >>>> )
>> >>>>>> with the latest changes; mostly bringing back "Atomic
>> Checkpointing"
>> >>>> (for
>> >>>>>> what feels like the 10th time!). I think the one thing missing is
>> some
>> >>>>>> changes to metrics (notably the store "flush" metrics will need to
>> be
>> >>>>>> renamed to "commit").
>> >>>>>>
>> >>>>>> The reason I brought back Atomic Checkpointing was to decouple
>> store
>> >>>> flush
>> >>>>>> from store commit. This is important, because with Transactional
>> >>>>>> StateStores, we now need to call "flush" on *every* Task commit,
>> and
>> >> not
>> >>>>>> just when the StateStore is closing, otherwise our transaction
>> buffer
>> >>>> will
>> >>>>>> never be written and persisted, instead growing unbounded! I
>> >>>> experimented
>> >>>>>> with some simple solutions, like forcing a store flush whenever the
>> >>>>>> transaction buffer was likely to exceed its configured size, but
>> this
>> >>>> was
>> >>>>>> brittle: it prevented the transaction buffer from being configured
>> to
>> >> be
>> >>>>>> unbounded, and it still would have required explicit flushes of
>> >> RocksDB,
>> >>>>>> yielding sub-optimal performance and memory utilization.
>> >>>>>>
>> >>>>>> I deemed Atomic Checkpointing to be the "right" way to resolve this
>> >>>>>> problem. By ensuring that the changelog offsets that correspond to
>> the
>> >>>> most
>> >>>>>> recently written records are always atomically written to the
>> >> StateStore
>> >>>>>> (by writing them to the same transaction buffer), we can avoid
>> >> forcibly
>> >>>>>> flushing the RocksDB memtables to disk, letting RocksDB flush them
>> >> only
>> >>>>>> when necessary, without losing any of our consistency guarantees.
>> See
>> >>>> the
>> >>>>>> updated KIP for more info.
>> >>>>>>
>> >>>>>> I have fully implemented these changes, although I'm still not
>> >> entirely
>> >>>>>> happy with the implementation for segmented StateStores, so I plan
>> to
>> >>>>>> refactor that. Despite that, all tests pass. If you'd like to try
>> out
>> >> or
>> >>>>>> review this highly experimental and incomplete branch, it's
>> available
>> >>>> here:
>> >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note:
>> it's
>> >>>> built
>> >>>>>> against Kafka 3.5.0 so that I had a stable base to build and test
>> it
>> >> on,
>> >>>>>> and to enable easy apples-to-apples comparisons in a live
>> >> environment. I
>> >>>>>> plan to rebase it against trunk once it's nearer completion and has
>> >> been
>> >>>>>> proven on our main application.
>> >>>>>>
>> >>>>>> I would really appreciate help in reviewing and testing:
>> >>>>>> - Segmented (Versioned, Session and Window) stores
>> >>>>>> - Global stores
>> >>>>>>
>> >>>>>> As I do not currently use either of these, so my primary test
>> >>>> environment
>> >>>>>> doesn't test these areas.
>> >>>>>>
>> >>>>>> I'm going on Parental Leave starting next week for a few weeks, so
>> >> will
>> >>>>>> not have time to move this forward until late August. That said,
>> your
>> >>>>>> feedback is welcome and appreciated, I just won't be able to
>> respond
>> >> as
>> >>>>>> quickly as usual.
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Nick
>> >>>>>>
>> >>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com>
>> >>>> wrote:
>> >>>>>>
>> >>>>>>> Hi Bruno
>> >>>>>>>
>> >>>>>>> Yes, that's correct, although the impact on IQ is not something I
>> had
>> >>>>>>> considered.
>> >>>>>>>
>> >>>>>>> What about atomically updating the state store from the
>> transaction
>> >>>>>>>> buffer every commit interval and writing the checkpoint (thus,
>> >>>> flushing
>> >>>>>>>> the memtable) every configured amount of data and/or number of
>> >> commit
>> >>>>>>>> intervals?
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>> I'm not quite sure I follow. Are you suggesting that we add an
>> >>>> additional
>> >>>>>>> config for the max number of commit intervals between checkpoints?
>> >> That
>> >>>>>>> way, we would checkpoint *either* when the transaction buffers are
>> >>>> nearly
>> >>>>>>> full, *OR* whenever a certain number of commit intervals have
>> >> elapsed,
>> >>>>>>> whichever comes first?
>> >>>>>>>
>> >>>>>>> That certainly seems reasonable, although this re-ignites an
>> earlier
>> >>>>>>> debate about whether a config should be measured in "number of
>> commit
>> >>>>>>> intervals", instead of just an absolute time.
>> >>>>>>>
>> >>>>>>> FWIW, I realised that this issue is the reason I was pursuing the
>> >>>> Atomic
>> >>>>>>> Checkpoints, as it de-couples memtable flush from checkpointing,
>> >> which
>> >>>>>>> enables us to just checkpoint on every commit without any
>> performance
>> >>>>>>> impact. Atomic Checkpointing is definitely the "best" solution,
>> but
>> >>>> I'm not
>> >>>>>>> sure if this is enough to bring it back into this KIP.
>> >>>>>>>
>> >>>>>>> I'm currently working on moving all the transactional logic
>> directly
>> >>>> into
>> >>>>>>> RocksDBStore itself, which does away with the
>> >> StateStore#newTransaction
>> >>>>>>> method, and reduces the number of new classes introduced,
>> >> significantly
>> >>>>>>> reducing the complexity. If it works, and the complexity is
>> >> drastically
>> >>>>>>> reduced, I may try bringing back Atomic Checkpoints into this KIP.
>> >>>>>>>
>> >>>>>>> Regards,
>> >>>>>>> Nick
>> >>>>>>>
>> >>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org>
>> >> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Nick,
>> >>>>>>>>
>> >>>>>>>> Thanks for the insights! Very interesting!
>> >>>>>>>>
>> >>>>>>>> As far as I understand, you want to atomically update the state
>> >> store
>> >>>>>>>> from the transaction buffer, flush the memtable of a state store
>> and
>> >>>>>>>> write the checkpoint not after the commit time elapsed but after
>> the
>> >>>>>>>> transaction buffer reached a size that would lead to exceeding
>> >>>>>>>> statestore.transaction.buffer.max.bytes before the next commit
>> >>>> interval
>> >>>>>>>> ends.
>> >>>>>>>> That means, the Kafka transaction would commit every commit
>> interval
>> >>>> but
>> >>>>>>>> the state store will only be atomically updated roughly every
>> >>>>>>>> statestore.transaction.buffer.max.bytes of data. Also IQ would
>> then
>> >>>> only
>> >>>>>>>> see new data roughly every
>> statestore.transaction.buffer.max.bytes.
>> >>>>>>>> After a failure the state store needs to restore up to
>> >>>>>>>> statestore.transaction.buffer.max.bytes.
>> >>>>>>>>
>> >>>>>>>> Is this correct?
>> >>>>>>>>
>> >>>>>>>> What about atomically updating the state store from the
>> transaction
>> >>>>>>>> buffer every commit interval and writing the checkpoint (thus,
>> >>>> flushing
>> >>>>>>>> the memtable) every configured amount of data and/or number of
>> >> commit
>> >>>>>>>> intervals? In such a way, we would have the same delay for
>> records
>> >>>>>>>> appearing in output topics and IQ because both would appear when
>> the
>> >>>>>>>> Kafka transaction is committed. However, after a failure the
>> state
>> >>>> store
>> >>>>>>>> still needs to restore up to
>> statestore.transaction.buffer.max.bytes
>> >>>> and
>> >>>>>>>> it might restore data that is already in the state store because
>> the
>> >>>>>>>> checkpoint lags behind the last stable offset (i.e. the last
>> >> committed
>> >>>>>>>> offset) of the changelog topics. Restoring data that is already
>> in
>> >> the
>> >>>>>>>> state store is idempotent, so eos should not violated.
>> >>>>>>>> This solution needs at least one new config to specify when a
>> >>>> checkpoint
>> >>>>>>>> should be written.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> A small correction to your previous e-mail that does not change
>> >>>> anything
>> >>>>>>>> you said: Under alos the default commit interval is 30 seconds,
>> not
>> >>>> five
>> >>>>>>>> seconds.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Bruno
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
>> >>>>>>>>> Hi everyone,
>> >>>>>>>>>
>> >>>>>>>>> I've begun performance testing my branch on our staging
>> >> environment,
>> >>>>>>>>> putting it through its paces in our non-trivial application. I'm
>> >>>>>>>> already
>> >>>>>>>>> observing the same increased flush rate that we saw the last
>> time
>> >> we
>> >>>>>>>>> attempted to use a version of this KIP, but this time, I think I
>> >> know
>> >>>>>>>> why.
>> >>>>>>>>>
>> >>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end
>> of
>> >> the
>> >>>>>>>> Task
>> >>>>>>>>> commit process, has the following behaviour:
>> >>>>>>>>>
>> >>>>>>>>>        - Under ALOS: checkpoint the state stores. This includes
>> >>>>>>>>>        flushing memtables in RocksDB. This is acceptable
>> because the
>> >>>>>>>> default
>> >>>>>>>>>        commit.interval.ms is 5 seconds, so forcibly flushing
>> >> memtables
>> >>>>>>>> every 5
>> >>>>>>>>>        seconds is acceptable for most applications.
>> >>>>>>>>>        - Under EOS: checkpointing is not done, *unless* it's
>> being
>> >>>>>>>> forced, due
>> >>>>>>>>>        to e.g. the Task closing or being revoked. This means
>> that
>> >> under
>> >>>>>>>> normal
>> >>>>>>>>>        processing conditions, the state stores will not be
>> >>>> checkpointed,
>> >>>>>>>> and will
>> >>>>>>>>>        not have memtables flushed at all , unless RocksDB
>> decides to
>> >>>>>>>> flush them on
>> >>>>>>>>>        its own. Checkpointing stores and force-flushing their
>> >> memtables
>> >>>>>>>> is only
>> >>>>>>>>>        done when a Task is being closed.
>> >>>>>>>>>
>> >>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on at least *some*
>> >>>> normal
>> >>>>>>>>> Task commits, in order to write the RocksDB transaction buffers
>> to
>> >>>> the
>> >>>>>>>>> state stores, and to ensure the offsets are synced to disk to
>> >> prevent
>> >>>>>>>>> restores from getting out of hand. Consequently, my current
>> >>>>>>>> implementation
>> >>>>>>>>> calls maybeCheckpoint on *every* Task commit, which is far too
>> >>>>>>>> frequent.
>> >>>>>>>>> This causes checkpoints every 10,000 records, which is a change
>> in
>> >>>>>>>> flush
>> >>>>>>>>> behaviour, potentially causing performance problems for some
>> >>>>>>>> applications.
>> >>>>>>>>>
>> >>>>>>>>> I'm looking into possible solutions, and I'm currently leaning
>> >>>> towards
>> >>>>>>>>> using the statestore.transaction.buffer.max.bytes configuration
>> to
>> >>>>>>>>> checkpoint Tasks once we are likely to exceed it. This would
>> >>>>>>>> complement the
>> >>>>>>>>> existing "early Task commit" functionality that this
>> configuration
>> >>>>>>>>> provides, in the following way:
>> >>>>>>>>>
>> >>>>>>>>>        - Currently, we use
>> statestore.transaction.buffer.max.bytes
>> >> to
>> >>>>>>>> force an
>> >>>>>>>>>        early Task commit if processing more records would cause
>> our
>> >>>> state
>> >>>>>>>> store
>> >>>>>>>>>        transactions to exceed the memory assigned to them.
>> >>>>>>>>>        - New functionality: when a Task *does* commit, we will
>> not
>> >>>>>>>> checkpoint
>> >>>>>>>>>        the stores (and hence flush the transaction buffers)
>> unless
>> >> we
>> >>>>>>>> expect to
>> >>>>>>>>>        cross the statestore.transaction.buffer.max.bytes
>> threshold
>> >>>> before
>> >>>>>>>> the next
>> >>>>>>>>>        commit
>> >>>>>>>>>
>> >>>>>>>>> I'm also open to suggestions.
>> >>>>>>>>>
>> >>>>>>>>> Regards,
>> >>>>>>>>> Nick
>> >>>>>>>>>
>> >>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
>> nick.telf...@gmail.com
>> >>>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Bruno!
>> >>>>>>>>>>
>> >>>>>>>>>> 3.
>> >>>>>>>>>> By "less predictable for users", I meant in terms of
>> understanding
>> >>>> the
>> >>>>>>>>>> performance profile under various circumstances. The more
>> complex
>> >>>> the
>> >>>>>>>>>> solution, the more difficult it would be for users to
>> understand
>> >> the
>> >>>>>>>>>> performance they see. For example, spilling records to disk
>> when
>> >> the
>> >>>>>>>>>> transaction buffer reaches a threshold would, I expect, reduce
>> >> write
>> >>>>>>>>>> throughput. This reduction in write throughput could be
>> >> unexpected,
>> >>>>>>>> and
>> >>>>>>>>>> potentially difficult to diagnose/understand for users.
>> >>>>>>>>>> At the moment, I think the "early commit" concept is relatively
>> >>>>>>>>>> straightforward; it's easy to document, and conceptually fairly
>> >>>>>>>> obvious to
>> >>>>>>>>>> users. We could probably add a metric to make it easier to
>> >>>> understand
>> >>>>>>>> when
>> >>>>>>>>>> it happens though.
>> >>>>>>>>>>
>> >>>>>>>>>> 3. (the second one)
>> >>>>>>>>>> The IsolationLevel is *essentially* an indirect way of telling
>> >>>>>>>> StateStores
>> >>>>>>>>>> whether they should be transactional. READ_COMMITTED
>> essentially
>> >>>>>>>> requires
>> >>>>>>>>>> transactions, because it dictates that two threads calling
>> >>>>>>>>>> `newTransaction()` should not see writes from the other
>> >> transaction
>> >>>>>>>> until
>> >>>>>>>>>> they have been committed. With READ_UNCOMMITTED, all bets are
>> off,
>> >>>> and
>> >>>>>>>>>> stores can allow threads to observe written records at any
>> time,
>> >>>>>>>> which is
>> >>>>>>>>>> essentially "no transactions". That said, StateStores are free
>> to
>> >>>>>>>> implement
>> >>>>>>>>>> these guarantees however they can, which is a bit more relaxed
>> >> than
>> >>>>>>>>>> dictating "you must use transactions". For example, with
>> RocksDB
>> >> we
>> >>>>>>>> would
>> >>>>>>>>>> implement these as READ_COMMITTED == WBWI-based "transactions",
>> >>>>>>>>>> READ_UNCOMMITTED == direct writes to the database. But with
>> other
>> >>>>>>>> storage
>> >>>>>>>>>> engines, it might be preferable to *always* use transactions,
>> even
>> >>>>>>>> when
>> >>>>>>>>>> unnecessary; or there may be storage engines that don't provide
>> >>>>>>>>>> transactions, but the isolation guarantees can be met using a
>> >>>>>>>> different
>> >>>>>>>>>> technique.
>> >>>>>>>>>> My idea was to try to keep the StateStore interface as loosely
>> >>>> coupled
>> >>>>>>>>>> from the Streams engine as possible, to give implementers more
>> >>>>>>>> freedom, and
>> >>>>>>>>>> reduce the amount of internal knowledge required.
>> >>>>>>>>>> That said, I understand that "IsolationLevel" might not be the
>> >> right
>> >>>>>>>>>> abstraction, and we can always make it much more explicit if
>> >>>>>>>> required, e.g.
>> >>>>>>>>>> boolean transactional()
>> >>>>>>>>>>
>> >>>>>>>>>> 7-8.
>> >>>>>>>>>> I can make these changes either later today or tomorrow.
>> >>>>>>>>>>
>> >>>>>>>>>> Small update:
>> >>>>>>>>>> I've rebased my branch on trunk and fixed a bunch of issues
>> that
>> >>>>>>>> needed
>> >>>>>>>>>> addressing. Currently, all the tests pass, which is promising,
>> but
>> >>>> it
>> >>>>>>>> will
>> >>>>>>>>>> need to undergo some performance testing. I haven't (yet)
>> worked
>> >> on
>> >>>>>>>>>> removing the `newTransaction()` stuff, but I would expect that,
>> >>>>>>>>>> behaviourally, it should make no difference. The branch is
>> >> available
>> >>>>>>>> at
>> >>>>>>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone
>> is
>> >>>>>>>>>> interested in taking an early look.
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Nick
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
>> cado...@apache.org>
>> >>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Nick,
>> >>>>>>>>>>>
>> >>>>>>>>>>> 1.
>> >>>>>>>>>>> Yeah, I agree with you. That was actually also my point. I
>> >>>> understood
>> >>>>>>>>>>> that John was proposing the ingestion path as a way to avoid
>> the
>> >>>>>>>> early
>> >>>>>>>>>>> commits. Probably, I misinterpreted the intent.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 2.
>> >>>>>>>>>>> I agree with John here, that actually it is public API. My
>> >> question
>> >>>>>>>> is
>> >>>>>>>>>>> how this usage pattern affects normal processing.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 3.
>> >>>>>>>>>>> My concern is that checking for the size of the transaction
>> >> buffer
>> >>>>>>>> and
>> >>>>>>>>>>> maybe triggering an early commit affects the whole processing
>> of
>> >>>>>>>> Kafka
>> >>>>>>>>>>> Streams. The transactionality of a state store is not
>> confined to
>> >>>> the
>> >>>>>>>>>>> state store itself, but spills over and changes the behavior
>> of
>> >>>> other
>> >>>>>>>>>>> parts of the system. I agree with you that it is a decent
>> >>>>>>>> compromise. I
>> >>>>>>>>>>> just wanted to analyse the downsides and list the options to
>> >>>> overcome
>> >>>>>>>>>>> them. I also agree with you that all options seem quite heavy
>> >>>>>>>> compared
>> >>>>>>>>>>> with your KIP. I do not understand what you mean with "less
>> >>>>>>>> predictable
>> >>>>>>>>>>> for users", though.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> I found the discussions about the alternatives really
>> >> interesting.
>> >>>>>>>> But I
>> >>>>>>>>>>> also think that your plan sounds good and we should continue
>> with
>> >>>> it!
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Some comments on your reply to my e-mail on June 20th:
>> >>>>>>>>>>>
>> >>>>>>>>>>> 3.
>> >>>>>>>>>>> Ah, now, I understand the reasoning behind putting isolation
>> >> level
>> >>>> in
>> >>>>>>>>>>> the state store context. Thanks! Should that also be a way to
>> >> give
>> >>>>>>>> the
>> >>>>>>>>>>> the state store the opportunity to decide whether to turn on
>> >>>>>>>>>>> transactions or not?
>> >>>>>>>>>>> With my comment, I was more concerned about how do you know
>> if a
>> >>>>>>>>>>> checkpoint file needs to be written under EOS, if you do not
>> >> have a
>> >>>>>>>> way
>> >>>>>>>>>>> to know if the state store is transactional or not. If a state
>> >>>> store
>> >>>>>>>> is
>> >>>>>>>>>>> transactional, the checkpoint file can be written during
>> normal
>> >>>>>>>>>>> processing under EOS. If the state store is not transactional,
>> >> the
>> >>>>>>>>>>> checkpoint file must not be written under EOS.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 7.
>> >>>>>>>>>>> My point was about not only considering the bytes in memory in
>> >>>> config
>> >>>>>>>>>>> statestore.uncommitted.max.bytes, but also bytes that might be
>> >>>>>>>> spilled
>> >>>>>>>>>>> on disk. Basically, I was wondering whether you should remove
>> the
>> >>>>>>>>>>> "memory" in "Maximum number of memory bytes to be used to
>> >>>>>>>>>>> buffer uncommitted state-store records." My thinking was that
>> >> even
>> >>>>>>>> if a
>> >>>>>>>>>>> state store spills uncommitted bytes to disk, limiting the
>> >> overall
>> >>>>>>>> bytes
>> >>>>>>>>>>> might make sense. Thinking about it again and considering the
>> >>>> recent
>> >>>>>>>>>>> discussions, it does not make too much sense anymore.
>> >>>>>>>>>>> I like the name statestore.transaction.buffer.max.bytes that
>> you
>> >>>>>>>> proposed.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 8.
>> >>>>>>>>>>> A high-level description (without implementation details) of
>> how
>> >>>>>>>> Kafka
>> >>>>>>>>>>> Streams will manage the commit of changelog transactions,
>> state
>> >>>> store
>> >>>>>>>>>>> transactions and checkpointing would be great. Would be great
>> if
>> >>>> you
>> >>>>>>>>>>> could also add some sentences about the behavior in case of a
>> >>>>>>>> failure.
>> >>>>>>>>>>> For instance how does a transactional state store recover
>> after a
>> >>>>>>>>>>> failure or what happens with the transaction buffer, etc.
>> (that
>> >> is
>> >>>>>>>> what
>> >>>>>>>>>>> I meant by "fail-over" in point 9.)
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>> Bruno
>> >>>>>>>>>>>
>> >>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
>> >>>>>>>>>>>> Hi Bruno,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 1.
>> >>>>>>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex
>> >>>>>>>> transactions
>> >>>>>>>>>>>> have, whereby exceeding (or likely to exceed) configured
>> memory
>> >>>>>>>> needs to
>> >>>>>>>>>>>> trigger an early commit?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 2.
>> >>>>>>>>>>>> This is one of my big concerns. Ultimately, any approach
>> based
>> >> on
>> >>>>>>>>>>> cracking
>> >>>>>>>>>>>> open RocksDB internals and using it in ways it's not really
>> >>>> designed
>> >>>>>>>>>>> for is
>> >>>>>>>>>>>> likely to have some unforseen performance or consistency
>> issues.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 3.
>> >>>>>>>>>>>> What's your motivation for removing these early commits?
>> While
>> >> not
>> >>>>>>>>>>> ideal, I
>> >>>>>>>>>>>> think they're a decent compromise to ensure consistency
>> whilst
>> >>>>>>>>>>> maintaining
>> >>>>>>>>>>>> good and predictable performance.
>> >>>>>>>>>>>> All 3 of your suggested ideas seem *very* complicated, and
>> might
>> >>>>>>>>>>> actually
>> >>>>>>>>>>>> make behaviour less predictable for users as a consequence.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I'm a bit concerned that the scope of this KIP is growing a
>> bit
>> >>>> out
>> >>>>>>>> of
>> >>>>>>>>>>>> control. While it's good to discuss ideas for future
>> >>>> improvements, I
>> >>>>>>>>>>> think
>> >>>>>>>>>>>> it's important to narrow the scope down to a design that
>> >> achieves
>> >>>>>>>> the
>> >>>>>>>>>>> most
>> >>>>>>>>>>>> pressing objectives (constant sized restorations during dirty
>> >>>>>>>>>>>> close/unexpected errors). Any design that this KIP produces
>> can
>> >>>>>>>>>>> ultimately
>> >>>>>>>>>>>> be changed in the future, especially if the bulk of it is
>> >> internal
>> >>>>>>>>>>>> behaviour.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I'm going to spend some time next week trying to re-work the
>> >>>>>>>> original
>> >>>>>>>>>>>> WriteBatchWithIndex design to remove the newTransaction()
>> >> method,
>> >>>>>>>> such
>> >>>>>>>>>>> that
>> >>>>>>>>>>>> it's just an implementation detail of RocksDBStore. That
>> way, if
>> >>>> we
>> >>>>>>>>>>> want to
>> >>>>>>>>>>>> replace WBWI with something in the future, like the SST file
>> >>>>>>>> management
>> >>>>>>>>>>>> outlined by John, then we can do so with little/no API
>> changes.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Nick
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to