Hi everyone,

I've updated the KIP to reflect the latest version of the design:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores

There are several changes in there that reflect feedback from this thread,
and there's a new section and a bunch of interface changes relating to
Atomic Checkpointing, which is the final piece of the puzzle to making
everything robust.

Let me know what you think!

Regards,
Nick

On Tue, 3 Jan 2023 at 11:33, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Lucas,
>
> Thanks for looking over my KIP.
>
> A) The bound is per-instance, not per-Task. This was a typo in the KIP
> that I've now corrected. It was originally per-Task, but I changed it to
> per-instance for exactly the reason you highlighted.
> B) It's worth noting that transactionality is only enabled under EOS, and
> in the default mode of operation (ALOS), there should be no change in
> behavior at all. I think, under EOS, we can mitigate the impact on users by
> sufficiently low default values for the memory bound configuration. I
> understand your hesitation to include a significant change of behaviour,
> especially in a minor release, but I suspect that most users will prefer
> the memory impact (under EOS) to the existing behaviour of frequent state
> restorations! If this is a problem, the changes can wait until the next
> major release. I'll be running a patched version of streams in production
> with these changes as soon as they're ready, so it won't disrupt me :-D
> C) The main purpose of this sentence was just to note that some changes
> will need to be made to the way Segments are handled in order to ensure
> they also benefit from transactions. At the time I wrote it, I hadn't
> figured out the specific changes necessary, so it was deliberately vague.
> This is the one outstanding problem I'm currently working on, and I'll
> update this section with more detail once I have figured out the exact
> changes required.
> D) newTransaction() provides the necessary isolation guarantees. While
> the RocksDB implementation of transactions doesn't technically *need*
> read-only users to call newTransaction(), other implementations (e.g. a
> hypothetical PostgresStore) may require it. Calling newTransaction() when
> no transaction is necessary is essentially free, as it will just return
> this.
>
> I didn't do any profiling of the KIP-844 PoC, but I think it should be
> fairly obvious where the performance problems stem from: writes under
> KIP-844 require 3 extra memory-copies: 1 to encode it with the
> tombstone/record flag, 1 to decode it from the tombstone/record flag, and 1
> to copy the record from the "temporary" store to the "main" store, when the
> transaction commits. The different approach taken by KIP-869 should perform
> much better, as it avoids all these copies, and may actually perform
> slightly better than trunk, due to batched writes in RocksDB performing
> better than non-batched writes.[1]
>
> Regards,
> Nick
>
> 1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>
> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <lbruts...@confluent.io.invalid>
> wrote:
>
>> Hi Nick,
>>
>> I'm just starting to read up on the whole discussion about KIP-892 and
>> KIP-844. Thanks a lot for your work on this, I do think
>> `WriteBatchWithIndex` may be the way to go here. I do have some
>> questions about the latest draft.
>>
>>  A) If I understand correctly, you propose to put a bound on the
>> (native) memory consumed by each task. However, I wonder if this is
>> sufficient if we have temporary imbalances in the cluster. For
>> example, depending on the timing of rebalances during a cluster
>> restart, it could happen that a single streams node is assigned a lot
>> more tasks than expected. With your proposed change, this would mean
>> that the memory required by this one node could be a multiple of what
>> is required during normal operation. I wonder if it wouldn't be safer
>> to put a global bound on the memory use, across all tasks.
>>  B) Generally, the memory concerns still give me the feeling that this
>> should not be enabled by default for all users in a minor release.
>>  C) In section "Transaction Management": the sentence "A similar
>> analogue will be created to automatically manage `Segment`
>> transactions.". Maybe this is just me lacking some background, but I
>> do not understand this, it would be great if you could clarify what
>> you mean here.
>>  D) Could you please clarify why IQ has to call newTransaction(), when
>> it's read-only.
>>
>> And one last thing not strictly related to your KIP: if there is an
>> easy way for you to find out why the KIP-844 PoC is 20x slower (e.g.
>> by providing a flame graph), that would be quite interesting.
>>
>> Cheers,
>> Lucas
>>
>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> > Hi everyone,
>> >
>> > I've updated the KIP with a more detailed design, which reflects the
>> > implementation I've been working on:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> >
>> > This new design should address the outstanding points already made in
>> the
>> > thread.
>> >
>> > Please let me know if there are areas that are unclear or need more
>> > clarification.
>> >
>> > I have a (nearly) working implementation. I'm confident that the
>> remaining
>> > work (making Segments behave) will not impact the documented design.
>> >
>> > Regards,
>> >
>> > Nick
>> >
>> > On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io> wrote:
>> >
>> > > Nick,
>> > >
>> > > Thank you for the reply; that makes sense. I was hoping that, since
>> reading
>> > > uncommitted records from IQ in EOS isn't part of the documented API,
>> maybe
>> > > you *wouldn't* have to wait for the next major release to make that
>> change;
>> > > but given that it would be considered a major change, I like your
>> approach
>> > > the best.
>> > >
>> > > Wishing you a speedy recovery and happy coding!
>> > >
>> > > Thanks,
>> > > Colt McNealy
>> > > *Founder, LittleHorse.io*
>> > >
>> > >
>> > > On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <nick.telf...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Colt,
>> > > >
>> > > > 10: Yes, I agree it's not ideal. I originally intended to try to
>> keep the
>> > > > behaviour unchanged as much as possible, otherwise we'd have to
>> wait for
>> > > a
>> > > > major version release to land these changes.
>> > > > 20: Good point, ALOS doesn't need the same level of guarantee, and
>> the
>> > > > typically longer commit intervals would be problematic when reading
>> only
>> > > > "committed" records.
>> > > >
>> > > > I've been away for 5 days recovering from minor surgery, but I
>> spent a
>> > > > considerable amount of that time working through ideas for possible
>> > > > solutions in my head. I think your suggestion of keeping ALOS
>> as-is, but
>> > > > buffering writes for EOS is the right path forwards, although I
>> have a
>> > > > solution that both expands on this, and provides for some more
>> formal
>> > > > guarantees.
>> > > >
>> > > > Essentially, adding support to KeyValueStores for "Transactions",
>> with
>> > > > clearly defined IsolationLevels. Using "Read Committed" when under
>> EOS,
>> > > and
>> > > > "Read Uncommitted" under ALOS.
>> > > >
>> > > > The nice thing about this approach is that it gives us much more
>> clearly
>> > > > defined isolation behaviour that can be properly documented to
>> ensure
>> > > users
>> > > > know what to expect.
>> > > >
>> > > > I'm still working out the kinks in the design, and will update the
>> KIP
>> > > when
>> > > > I have something. The main struggle is trying to implement this
>> without
>> > > > making any major changes to the existing interfaces or breaking
>> existing
>> > > > implementations, because currently everything expects to operate
>> directly
>> > > > on a StateStore, and not a Transaction of that store. I think I'm
>> getting
>> > > > close, although sadly I won't be able to progress much until next
>> week
>> > > due
>> > > > to some work commitments.
>> > > >
>> > > > Regards,
>> > > > Nick
>> > > >
>> > > > On Thu, 1 Dec 2022 at 00:01, Colt McNealy <c...@littlehorse.io>
>> wrote:
>> > > >
>> > > > > Nick,
>> > > > >
>> > > > > Thank you for the explanation, and also for the updated KIP. I am
>> quite
>> > > > > eager for this improvement to be released as it would greatly
>> reduce
>> > > the
>> > > > > operational difficulties of EOS streams apps.
>> > > > >
>> > > > > Two questions:
>> > > > >
>> > > > > 10)
>> > > > > >When reading records, we will use the
>> > > > > WriteBatchWithIndex#getFromBatchAndDB
>> > > > >  and WriteBatchWithIndex#newIteratorWithBase utilities in order to
>> > > ensure
>> > > > > that uncommitted writes are available to query.
>> > > > > Why do extra work to enable the reading of uncommitted writes
>> during
>> > > IQ?
>> > > > > Code complexity aside, reading uncommitted writes is, in my
>> opinion, a
>> > > > > minor flaw in EOS IQ; it would be very nice to have the guarantee
>> that,
>> > > > > with EOS, IQ only reads committed records. In order to avoid dirty
>> > > reads,
>> > > > > one currently must query a standby replica (but this still doesn't
>> > > fully
>> > > > > guarantee monotonic reads).
>> > > > >
>> > > > > 20) Is it also necessary to enable this optimization on ALOS
>> stores?
>> > > The
>> > > > > motivation of KIP-844 was mainly to reduce the need to restore
>> state
>> > > from
>> > > > > scratch on unclean EOS shutdowns; with ALOS it was acceptable to
>> accept
>> > > > > that there may have been uncommitted writes on disk. On a side
>> note, if
>> > > > you
>> > > > > enable this type of store on ALOS processors, the community would
>> > > > > definitely want to enable queries on dirty reads; otherwise users
>> would
>> > > > > have to wait 30 seconds (default) to see an update.
>> > > > >
>> > > > > Thank you for doing this fantastic work!
>> > > > > Colt McNealy
>> > > > > *Founder, LittleHorse.io*
>> > > > >
>> > > > >
>> > > > > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
>> nick.telf...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi everyone,
>> > > > > >
>> > > > > > I've drastically reduced the scope of this KIP to no longer
>> include
>> > > the
>> > > > > > StateStore management of checkpointing. This can be added as a
>> KIP
>> > > > later
>> > > > > on
>> > > > > > to further optimize the consistency and performance of state
>> stores.
>> > > > > >
>> > > > > > I've also added a section discussing some of the concerns around
>> > > > > > concurrency, especially in the presence of Iterators. I'm
>> thinking of
>> > > > > > wrapping WriteBatchWithIndex with a reference-counting
>> copy-on-write
>> > > > > > implementation (that only makes a copy if there's an active
>> > > iterator),
>> > > > > but
>> > > > > > I'm open to suggestions.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Nick
>> > > > > >
>> > > > > > On Mon, 28 Nov 2022 at 16:36, Nick Telford <
>> nick.telf...@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Colt,
>> > > > > > >
>> > > > > > > I didn't do any profiling, but the 844 implementation:
>> > > > > > >
>> > > > > > >    - Writes uncommitted records to a temporary RocksDB
>> instance
>> > > > > > >       - Since tombstones need to be flagged, all record
>> values are
>> > > > > > >       prefixed with a value/tombstone marker. This
>> necessitates a
>> > > > > memory
>> > > > > > copy.
>> > > > > > >    - On-commit, iterates all records in this temporary
>> instance and
>> > > > > > >    writes them to the main RocksDB store.
>> > > > > > >    - While iterating, the value/tombstone marker needs to be
>> parsed
>> > > > and
>> > > > > > >    the real value extracted. This necessitates another memory
>> copy.
>> > > > > > >
>> > > > > > > My guess is that the cost of iterating the temporary RocksDB
>> store
>> > > is
>> > > > > the
>> > > > > > > major factor, with the 2 extra memory copies per-Record
>> > > contributing
>> > > > a
>> > > > > > > significant amount too.
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > > Nick
>> > > > > > >
>> > > > > > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
>> c...@littlehorse.io>
>> > > > > wrote:
>> > > > > > >
>> > > > > > >> Hi all,
>> > > > > > >>
>> > > > > > >> Out of curiosity, why does the performance of the store
>> degrade so
>> > > > > > >> significantly with the 844 implementation? I wouldn't be too
>> > > > surprised
>> > > > > > by
>> > > > > > >> a
>> > > > > > >> 50-60% drop (caused by each record being written twice), but
>> 96%
>> > > is
>> > > > > > >> extreme.
>> > > > > > >>
>> > > > > > >> The only thing I can think of which could create such a
>> bottleneck
>> > > > > would
>> > > > > > >> be
>> > > > > > >> that perhaps the 844 implementation deserializes and then
>> > > > > re-serializes
>> > > > > > >> the
>> > > > > > >> store values when copying from the uncommitted to committed
>> store,
>> > > > > but I
>> > > > > > >> wasn't able to figure that out when I scanned the PR.
>> > > > > > >>
>> > > > > > >> Colt McNealy
>> > > > > > >> *Founder, LittleHorse.io*
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
>> > > > nick.telf...@gmail.com>
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >> > Hi everyone,
>> > > > > > >> >
>> > > > > > >> > I've updated the KIP to resolve all the points that have
>> been
>> > > > raised
>> > > > > > so
>> > > > > > >> > far, with one exception: the ALOS default commit interval
>> of 5
>> > > > > minutes
>> > > > > > >> is
>> > > > > > >> > likely to cause WriteBatchWithIndex memory to grow too
>> large.
>> > > > > > >> >
>> > > > > > >> > There's a couple of different things I can think of to
>> solve
>> > > this:
>> > > > > > >> >
>> > > > > > >> >    - We already have a memory/record limit in the KIP to
>> prevent
>> > > > OOM
>> > > > > > >> >    errors. Should we choose a default value for these? My
>> > > concern
>> > > > > here
>> > > > > > >> is
>> > > > > > >> > that
>> > > > > > >> >    anything we choose might seem rather arbitrary. We could
>> > > change
>> > > > > > >> >    its behaviour such that under ALOS, it only triggers the
>> > > commit
>> > > > > of
>> > > > > > >> the
>> > > > > > >> >    StateStore, but under EOS, it triggers a commit of the
>> Kafka
>> > > > > > >> > transaction.
>> > > > > > >> >    - We could introduce a separate `checkpoint.interval.ms`
>> to
>> > > > > allow
>> > > > > > >> ALOS
>> > > > > > >> >    to commit the StateStores more frequently than the
>> general
>> > > > > > >> >    commit.interval.ms? My concern here is that the
>> semantics of
>> > > > > this
>> > > > > > >> > config
>> > > > > > >> >    would depend on the processing.mode; under ALOS it would
>> > > allow
>> > > > > more
>> > > > > > >> >    frequently committing stores, whereas under EOS it
>> couldn't.
>> > > > > > >> >
>> > > > > > >> > Any better ideas?
>> > > > > > >> >
>> > > > > > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <
>> > > > nick.telf...@gmail.com>
>> > > > > > >> wrote:
>> > > > > > >> >
>> > > > > > >> > > Hi Alex,
>> > > > > > >> > >
>> > > > > > >> > > Thanks for the feedback.
>> > > > > > >> > >
>> > > > > > >> > > I've updated the discussion of OOM issues by describing
>> how
>> > > > we'll
>> > > > > > >> handle
>> > > > > > >> > > it. Here's the new text:
>> > > > > > >> > >
>> > > > > > >> > > To mitigate this, we will automatically force a Task
>> commit if
>> > > > the
>> > > > > > >> total
>> > > > > > >> > >> uncommitted records returned by
>> > > > > > >> > >> StateStore#approximateNumUncommittedEntries()  exceeds a
>> > > > > threshold,
>> > > > > > >> > >> configured by max.uncommitted.state.entries.per.task;
>> or the
>> > > > > total
>> > > > > > >> > >> memory used for buffering uncommitted records returned
>> by
>> > > > > > >> > >> StateStore#approximateNumUncommittedBytes() exceeds the
>> > > > threshold
>> > > > > > >> > >> configured by max.uncommitted.state.bytes.per.task.
>> This will
>> > > > > > roughly
>> > > > > > >> > >> bound the memory required per-Task for buffering
>> uncommitted
>> > > > > > records,
>> > > > > > >> > >> irrespective of the commit.interval.ms, and will
>> effectively
>> > > > > bound
>> > > > > > >> the
>> > > > > > >> > >> number of records that will need to be restored in the
>> event
>> > > > of a
>> > > > > > >> > failure.
>> > > > > > >> > >>
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > These limits will be checked in StreamTask#process and a
>> > > > premature
>> > > > > > >> commit
>> > > > > > >> > >> will be requested via Task#requestCommit().
>> > > > > > >> > >>
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > Note that these new methods provide default
>> implementations
>> > > that
>> > > > > > >> ensure
>> > > > > > >> > >> existing custom stores and non-transactional stores
>> (e.g.
>> > > > > > >> > >> InMemoryKeyValueStore) do not force any early commits.
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > I've chosen to have the StateStore expose approximations
>> of
>> > > its
>> > > > > > buffer
>> > > > > > >> > > size/count instead of opaquely requesting a commit in
>> order to
>> > > > > > >> delegate
>> > > > > > >> > the
>> > > > > > >> > > decision making to the Task itself. This enables Tasks
>> to look
>> > > > at
>> > > > > > >> *all*
>> > > > > > >> > of
>> > > > > > >> > > their StateStores, and determine whether an early commit
>> is
>> > > > > > necessary.
>> > > > > > >> > > Notably, it enables pre-Task thresholds, instead of
>> per-Store,
>> > > > > which
>> > > > > > >> > > prevents Tasks with many StateStores from using much more
>> > > memory
>> > > > > > than
>> > > > > > >> > Tasks
>> > > > > > >> > > with one StateStore. This makes sense, since commits are
>> done
>> > > > > > by-Task,
>> > > > > > >> > not
>> > > > > > >> > > by-Store.
>> > > > > > >> > >
>> > > > > > >> > > Prizes* for anyone who can come up with a better name
>> for the
>> > > > new
>> > > > > > >> config
>> > > > > > >> > > properties!
>> > > > > > >> > >
>> > > > > > >> > > Thanks for pointing out the potential performance issues
>> of
>> > > > WBWI.
>> > > > > > From
>> > > > > > >> > the
>> > > > > > >> > > benchmarks that user posted[1], it looks like WBWI still
>> > > > performs
>> > > > > > >> > > considerably better than individual puts, which is the
>> > > existing
>> > > > > > >> design,
>> > > > > > >> > so
>> > > > > > >> > > I'd actually expect a performance boost from WBWI, just
>> not as
>> > > > > great
>> > > > > > >> as
>> > > > > > >> > > we'd get from a plain WriteBatch. This does suggest that
>> a
>> > > good
>> > > > > > >> > > optimization would be to use a regular WriteBatch for
>> > > > restoration
>> > > > > > (in
>> > > > > > >> > > RocksDBStore#restoreBatch), since we know that those
>> records
>> > > > will
>> > > > > > >> never
>> > > > > > >> > be
>> > > > > > >> > > queried before they're committed.
>> > > > > > >> > >
>> > > > > > >> > > 1:
>> > > > > > >> >
>> > > > > >
>> > > >
>> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> > > > > > >> > >
>> > > > > > >> > > * Just kidding, no prizes, sadly.
>> > > > > > >> > >
>> > > > > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
>> > > > > > >> > > <asorokou...@confluent.io.invalid> wrote:
>> > > > > > >> > >
>> > > > > > >> > >> Hey Nick,
>> > > > > > >> > >>
>> > > > > > >> > >> Thank you for the KIP! With such a significant
>> performance
>> > > > > > >> degradation
>> > > > > > >> > in
>> > > > > > >> > >> the secondary store approach, we should definitely
>> consider
>> > > > > > >> > >> WriteBatchWithIndex. I also like encapsulating
>> checkpointing
>> > > > > inside
>> > > > > > >> the
>> > > > > > >> > >> default state store implementation to improve
>> performance.
>> > > > > > >> > >>
>> > > > > > >> > >> +1 to John's comment to keep the current checkpointing
>> as a
>> > > > > > fallback
>> > > > > > >> > >> mechanism. We want to keep existing users' workflows
>> intact
>> > > if
>> > > > we
>> > > > > > >> can. A
>> > > > > > >> > >> non-intrusive way would be to add a separate StateStore
>> > > method,
>> > > > > > say,
>> > > > > > >> > >> StateStore#managesCheckpointing(), that controls
>> whether the
>> > > > > state
>> > > > > > >> store
>> > > > > > >> > >> implementation owns checkpointing.
>> > > > > > >> > >>
>> > > > > > >> > >> I think that a solution to the transactional writes
>> should
>> > > > > address
>> > > > > > >> the
>> > > > > > >> > >> OOMEs. One possible way to address that is to wire
>> > > StateStore's
>> > > > > > >> commit
>> > > > > > >> > >> request by adding, say, StateStore#commitNeeded that is
>> > > checked
>> > > > > in
>> > > > > > >> > >> StreamTask#commitNeeded via the corresponding
>> > > > > > ProcessorStateManager.
>> > > > > > >> > With
>> > > > > > >> > >> that change, RocksDBStore will have to track the current
>> > > > > > transaction
>> > > > > > >> > size
>> > > > > > >> > >> and request a commit when the size goes over a
>> (configurable)
>> > > > > > >> threshold.
>> > > > > > >> > >>
>> > > > > > >> > >> AFAIU WriteBatchWithIndex might perform significantly
>> slower
>> > > > than
>> > > > > > >> > non-txn
>> > > > > > >> > >> puts as the batch size grows [1]. We should have a
>> > > > configuration
>> > > > > to
>> > > > > > >> fall
>> > > > > > >> > >> back to the current behavior (and/or disable txn stores
>> for
>> > > > ALOS)
>> > > > > > >> unless
>> > > > > > >> > >> the benchmarks show negligible overhead for longer
>> commits /
>> > > > > > >> > large-enough
>> > > > > > >> > >> batch sizes.
>> > > > > > >> > >>
>> > > > > > >> > >> If you prefer to keep the KIP smaller, I would rather
>> cut out
>> > > > > > >> > >> state-store-managed checkpointing rather than proper
>> OOMe
>> > > > > handling
>> > > > > > >> and
>> > > > > > >> > >> being able to switch to non-txn behavior. The
>> checkpointing
>> > > is
>> > > > > not
>> > > > > > >> > >> necessary to solve the recovery-under-EOS problem. On
>> the
>> > > other
>> > > > > > hand,
>> > > > > > >> > once
>> > > > > > >> > >> WriteBatchWithIndex is in, it will be much easier to add
>> > > > > > >> > >> state-store-managed checkpointing.
>> > > > > > >> > >>
>> > > > > > >> > >> If you share the current implementation, I am happy to
>> help
>> > > you
>> > > > > > >> address
>> > > > > > >> > >> the
>> > > > > > >> > >> OOMe and configuration parts as well as review and test
>> the
>> > > > > patch.
>> > > > > > >> > >>
>> > > > > > >> > >> Best,
>> > > > > > >> > >> Alex
>> > > > > > >> > >>
>> > > > > > >> > >>
>> > > > > > >> > >> 1. https://github.com/facebook/rocksdb/issues/608
>> > > > > > >> > >>
>> > > > > > >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
>> > > > > > nick.telf...@gmail.com
>> > > > > > >> >
>> > > > > > >> > >> wrote:
>> > > > > > >> > >>
>> > > > > > >> > >> > Hi John,
>> > > > > > >> > >> >
>> > > > > > >> > >> > Thanks for the review and feedback!
>> > > > > > >> > >> >
>> > > > > > >> > >> > 1. Custom Stores: I've been mulling over this problem
>> > > myself.
>> > > > > As
>> > > > > > it
>> > > > > > >> > >> stands,
>> > > > > > >> > >> > custom stores would essentially lose checkpointing
>> with no
>> > > > > > >> indication
>> > > > > > >> > >> that
>> > > > > > >> > >> > they're expected to make changes, besides a line in
>> the
>> > > > release
>> > > > > > >> > notes. I
>> > > > > > >> > >> > agree that the best solution would be to provide a
>> default
>> > > > that
>> > > > > > >> > >> checkpoints
>> > > > > > >> > >> > to a file. The one thing I would change is that the
>> > > > > checkpointing
>> > > > > > >> is
>> > > > > > >> > to
>> > > > > > >> > >> a
>> > > > > > >> > >> > store-local file, instead of a per-Task file. This
>> way the
>> > > > > > >> StateStore
>> > > > > > >> > >> still
>> > > > > > >> > >> > technically owns its own checkpointing (via a default
>> > > > > > >> implementation),
>> > > > > > >> > >> and
>> > > > > > >> > >> > the StateManager/Task execution engine doesn't need
>> to know
>> > > > > > >> anything
>> > > > > > >> > >> about
>> > > > > > >> > >> > checkpointing, which greatly simplifies some of the
>> logic.
>> > > > > > >> > >> >
>> > > > > > >> > >> > 2. OOME errors: The main reasons why I didn't explore
>> a
>> > > > > solution
>> > > > > > to
>> > > > > > >> > >> this is
>> > > > > > >> > >> > a) to keep this KIP as simple as possible, and b)
>> because
>> > > I'm
>> > > > > not
>> > > > > > >> > >> exactly
>> > > > > > >> > >> > how to signal that a Task should commit prematurely.
>> I'm
>> > > > > > confident
>> > > > > > >> > it's
>> > > > > > >> > >> > possible, and I think it's worth adding a section on
>> > > handling
>> > > > > > this.
>> > > > > > >> > >> Besides
>> > > > > > >> > >> > my proposal to force an early commit once memory usage
>> > > > reaches
>> > > > > a
>> > > > > > >> > >> threshold,
>> > > > > > >> > >> > is there any other approach that you might suggest for
>> > > > tackling
>> > > > > > >> this
>> > > > > > >> > >> > problem?
>> > > > > > >> > >> >
>> > > > > > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my
>> > > > assumption
>> > > > > is
>> > > > > > >> that
>> > > > > > >> > >> > since transactional behaviour comes at little/no
>> cost, that
>> > > > it
>> > > > > > >> should
>> > > > > > >> > be
>> > > > > > >> > >> > available by default on all stores, irrespective of
>> the
>> > > > > > processing
>> > > > > > >> > mode.
>> > > > > > >> > >> > While ALOS doesn't use transactions, the Task itself
>> still
>> > > > > > >> "commits",
>> > > > > > >> > so
>> > > > > > >> > >> > the behaviour should be correct under ALOS too. I'm
>> not
>> > > > > convinced
>> > > > > > >> that
>> > > > > > >> > >> it's
>> > > > > > >> > >> > worth having both transactional/non-transactional
>> stores
>> > > > > > >> available, as
>> > > > > > >> > >> it
>> > > > > > >> > >> > would considerably increase the complexity of the
>> codebase,
>> > > > for
>> > > > > > >> very
>> > > > > > >> > >> little
>> > > > > > >> > >> > benefit.
>> > > > > > >> > >> >
>> > > > > > >> > >> > 4. Method deprecation: Are you referring to
>> > > > > > >> StateStore#getPosition()?
>> > > > > > >> > >> As I
>> > > > > > >> > >> > understand it, Position contains the position of the
>> > > *source*
>> > > > > > >> topics,
>> > > > > > >> > >> > whereas the commit offsets would be the *changelog*
>> > > offsets.
>> > > > So
>> > > > > > >> it's
>> > > > > > >> > >> still
>> > > > > > >> > >> > necessary to retain the Position data, as well as the
>> > > > changelog
>> > > > > > >> > offsets.
>> > > > > > >> > >> > What I meant in the KIP is that Position offsets are
>> > > > currently
>> > > > > > >> stored
>> > > > > > >> > >> in a
>> > > > > > >> > >> > file, and since we can atomically store metadata
>> along with
>> > > > the
>> > > > > > >> record
>> > > > > > >> > >> > batch we commit to RocksDB, we can move our Position
>> > > offsets
>> > > > in
>> > > > > > to
>> > > > > > >> > this
>> > > > > > >> > >> > metadata too, and gain the same transactional
>> guarantees
>> > > that
>> > > > > we
>> > > > > > >> will
>> > > > > > >> > >> for
>> > > > > > >> > >> > changelog offsets, ensuring that the Position offsets
>> are
>> > > > > > >> consistent
>> > > > > > >> > >> with
>> > > > > > >> > >> > the records that are read from the database.
>> > > > > > >> > >> >
>> > > > > > >> > >> > Regards,
>> > > > > > >> > >> > Nick
>> > > > > > >> > >> >
>> > > > > > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <
>> > > > > vvcep...@apache.org>
>> > > > > > >> > wrote:
>> > > > > > >> > >> >
>> > > > > > >> > >> > > Thanks for publishing this alternative, Nick!
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > The benchmark you mentioned in the KIP-844
>> discussion
>> > > seems
>> > > > > > like
>> > > > > > >> a
>> > > > > > >> > >> > > compelling reason to revisit the built-in
>> > > transactionality
>> > > > > > >> > mechanism.
>> > > > > > >> > >> I
>> > > > > > >> > >> > > also appreciate you analysis, showing that for most
>> use
>> > > > > cases,
>> > > > > > >> the
>> > > > > > >> > >> write
>> > > > > > >> > >> > > batch approach should be just fine.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > There are a couple of points that would hold me
>> back from
>> > > > > > >> approving
>> > > > > > >> > >> this
>> > > > > > >> > >> > > KIP right now:
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > 1. Loss of coverage for custom stores.
>> > > > > > >> > >> > > The fact that you can plug in a (relatively) simple
>> > > > > > >> implementation
>> > > > > > >> > of
>> > > > > > >> > >> the
>> > > > > > >> > >> > > XStateStore interfaces and automagically get a
>> > > distributed
>> > > > > > >> database
>> > > > > > >> > >> out
>> > > > > > >> > >> > of
>> > > > > > >> > >> > > it is a significant benefit of Kafka Streams. I'd
>> hate to
>> > > > > lose
>> > > > > > >> it,
>> > > > > > >> > so
>> > > > > > >> > >> it
>> > > > > > >> > >> > > would be better to spend some time and come up with
>> a way
>> > > > to
>> > > > > > >> > preserve
>> > > > > > >> > >> > that
>> > > > > > >> > >> > > property. For example, can we provide a default
>> > > > > implementation
>> > > > > > of
>> > > > > > >> > >> > > `commit(..)` that re-implements the existing
>> > > > checkpoint-file
>> > > > > > >> > >> approach? Or
>> > > > > > >> > >> > > perhaps add an `isTransactional()` flag to the state
>> > > store
>> > > > > > >> interface
>> > > > > > >> > >> so
>> > > > > > >> > >> > > that the runtime can decide whether to continue to
>> manage
>> > > > > > >> checkpoint
>> > > > > > >> > >> > files
>> > > > > > >> > >> > > vs delegating transactionality to the stores?
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > 2. Guarding against OOME
>> > > > > > >> > >> > > I appreciate your analysis, but I don't think it's
>> > > > sufficient
>> > > > > > to
>> > > > > > >> say
>> > > > > > >> > >> that
>> > > > > > >> > >> > > we will solve the memory problem later if it becomes
>> > > > > necessary.
>> > > > > > >> The
>> > > > > > >> > >> > > experience leading to that situation would be quite
>> bad:
>> > > > > > Imagine,
>> > > > > > >> > you
>> > > > > > >> > >> > > upgrade to AK 3.next, your tests pass, so you
>> deploy to
>> > > > > > >> production.
>> > > > > > >> > >> That
>> > > > > > >> > >> > > night, you get paged because your app is now
>> crashing
>> > > with
>> > > > > > >> OOMEs. As
>> > > > > > >> > >> with
>> > > > > > >> > >> > > all OOMEs, you'll have a really hard time finding
>> the
>> > > root
>> > > > > > cause,
>> > > > > > >> > and
>> > > > > > >> > >> > once
>> > > > > > >> > >> > > you do, you won't have a clear path to resolve the
>> issue.
>> > > > You
>> > > > > > >> could
>> > > > > > >> > >> only
>> > > > > > >> > >> > > tune down the commit interval and cache buffer size
>> until
>> > > > you
>> > > > > > >> stop
>> > > > > > >> > >> > getting
>> > > > > > >> > >> > > crashes.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > FYI, I know of multiple cases where people run EOS
>> with
>> > > > much
>> > > > > > >> larger
>> > > > > > >> > >> > commit
>> > > > > > >> > >> > > intervals to get better batching than the default,
>> so I
>> > > > don't
>> > > > > > >> think
>> > > > > > >> > >> this
>> > > > > > >> > >> > > pathological case would be as rare as you suspect.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > Given that we already have the rudiments of an idea
>> of
>> > > what
>> > > > > we
>> > > > > > >> could
>> > > > > > >> > >> do
>> > > > > > >> > >> > to
>> > > > > > >> > >> > > prevent this downside, we should take the time to
>> design
>> > > a
>> > > > > > >> solution.
>> > > > > > >> > >> We
>> > > > > > >> > >> > owe
>> > > > > > >> > >> > > it to our users to ensure that awesome new features
>> don't
>> > > > > come
>> > > > > > >> with
>> > > > > > >> > >> > bitter
>> > > > > > >> > >> > > pills unless we can't avoid it.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > 3. ALOS mode.
>> > > > > > >> > >> > > On the other hand, I didn't see an indication of how
>> > > stores
>> > > > > > will
>> > > > > > >> be
>> > > > > > >> > >> > > handled under ALOS (aka non-EOS) mode.
>> Theoretically, the
>> > > > > > >> > >> > transactionality
>> > > > > > >> > >> > > of the store and the processing mode are
>> orthogonal. A
>> > > > > > >> transactional
>> > > > > > >> > >> > store
>> > > > > > >> > >> > > would serve ALOS just as well as a
>> non-transactional one
>> > > > (if
>> > > > > > not
>> > > > > > >> > >> better).
>> > > > > > >> > >> > > Under ALOS, though, the default commit interval is
>> five
>> > > > > > minutes,
>> > > > > > >> so
>> > > > > > >> > >> the
>> > > > > > >> > >> > > memory issue is far more pressing.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > As I see it, we have several options to resolve this
>> > > point.
>> > > > > We
>> > > > > > >> could
>> > > > > > >> > >> > > demonstrate that transactional stores work just
>> fine for
>> > > > ALOS
>> > > > > > >> and we
>> > > > > > >> > >> can
>> > > > > > >> > >> > > therefore just swap over unconditionally. We could
>> also
>> > > > > disable
>> > > > > > >> the
>> > > > > > >> > >> > > transactional mechanism under ALOS so that stores
>> operate
>> > > > > just
>> > > > > > >> the
>> > > > > > >> > >> same
>> > > > > > >> > >> > as
>> > > > > > >> > >> > > they do today when run in ALOS mode. Finally, we
>> could do
>> > > > the
>> > > > > > >> same
>> > > > > > >> > as
>> > > > > > >> > >> in
>> > > > > > >> > >> > > KIP-844 and make transactional stores opt-in (it'd
>> be
>> > > > better
>> > > > > to
>> > > > > > >> > avoid
>> > > > > > >> > >> the
>> > > > > > >> > >> > > extra opt-in mechanism, but it's a good
>> > > > get-out-of-jail-free
>> > > > > > >> card).
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > 4. (minor point) Deprecation of methods
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > You mentioned that the new `commit` method replaces
>> > > flush,
>> > > > > > >> > >> > > updateChangelogOffsets, and checkpoint. It seems to
>> me
>> > > that
>> > > > > the
>> > > > > > >> > point
>> > > > > > >> > >> > about
>> > > > > > >> > >> > > atomicity and Position also suggests that it
>> replaces the
>> > > > > > >> Position
>> > > > > > >> > >> > > callbacks. However, the proposal only deprecates
>> `flush`.
>> > > > > > Should
>> > > > > > >> we
>> > > > > > >> > be
>> > > > > > >> > >> > > deprecating other methods as well?
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > Thanks again for the KIP! It's really nice that you
>> and
>> > > > Alex
>> > > > > > will
>> > > > > > >> > get
>> > > > > > >> > >> the
>> > > > > > >> > >> > > chance to collaborate on both directions so that we
>> can
>> > > get
>> > > > > the
>> > > > > > >> best
>> > > > > > >> > >> > > outcome for Streams and its users.
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > -John
>> > > > > > >> > >> > >
>> > > > > > >> > >> > >
>> > > > > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote:
>> > > > > > >> > >> > > > Hi everyone,
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > As I mentioned in the discussion thread for
>> KIP-844,
>> > > I've
>> > > > > > been
>> > > > > > >> > >> working
>> > > > > > >> > >> > on
>> > > > > > >> > >> > > > an alternative approach to achieving better
>> > > transactional
>> > > > > > >> > semantics
>> > > > > > >> > >> for
>> > > > > > >> > >> > > > Kafka Streams StateStores.
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > I've published this separately as KIP-892:
>> > > Transactional
>> > > > > > >> Semantics
>> > > > > > >> > >> for
>> > > > > > >> > >> > > > StateStores
>> > > > > > >> > >> > > > <
>> > > > > > >> > >> > >
>> > > > > > >> > >> >
>> > > > > > >> > >>
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> > > > > > >> > >> > > >,
>> > > > > > >> > >> > > > so that it can be discussed/reviewed separately
>> from
>> > > > > KIP-844.
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > Alex: I'm especially interested in what you think!
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > I have a nearly complete implementation of the
>> changes
>> > > > > > >> outlined in
>> > > > > > >> > >> this
>> > > > > > >> > >> > > > KIP, please let me know if you'd like me to push
>> them
>> > > for
>> > > > > > >> review
>> > > > > > >> > in
>> > > > > > >> > >> > > advance
>> > > > > > >> > >> > > > of a vote.
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > Regards,
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > > > Nick
>> > > > > > >> > >> > > >
>> > > > > > >> > >> > >
>> > > > > > >> > >> >
>> > > > > > >> > >>
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>>
>

Reply via email to