Nick,

Thanks for the response.

>Can you clarify how much state was restored in those 11 seconds?
That was a full restoration of ~650MB of state after I wiped the state
directory. The restoration after a crash with your branch is nearly
instantaneous, whereas with plain Kafka 3.5.0 a crash triggers a full
restoration (8 seconds).

Additionally, I pulled, rebuilt, and re-tested your changes and now the
restoration time with your branch is the same as with vanilla Streams!
Fantastic work!

I plan to do some more testing with larger state stores over the next
couple weeks, both with RocksDB and Speedb OSS. And perhaps I might even
try enabling some of the experimental Speedb OSS features, such as the
[Improved Write Flow](https://docs.speedb.io/speedb-features/write-flow).
As far as I understand, this isn't possible to do through the standard
RocksDBConfigSetter since some of the config options are Speedb-specific.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Sep 11, 2023 at 4:29 AM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Colt,
>
> Thanks for taking the time to run your benchmarks on this, that's
> incredibly helpful.
>
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (!!!!). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
>
> Can you clarify how much state was restored in those 11 seconds? Was this
> the time to do the full restore regardless, or was it the time to only
> restore a small fraction of the state (e.g. the last aborted transaction)?
>
> > -- QUESTION: Because we observed a significant (30% or so) and
> reproducible
> > slowdown during restoration, it seems like KIP-892 uses the checkpointing
> > behavior during restoration as well? If so, I would argue that this might
> > not be necessary, because everything we write is already committed, so we
> > don't need to change the behavior during restoration or standby tasks.
> > Perhaps we could write the offsets to RocksDB on every batch (or even
> every
> > 5 seconds or so).
>
> Restore has always used a completely separate code-path to regular writes,
> and continues to do so. I had a quick pass over the code and I suspect I
> know what's causing the performance degradation: for every restored record,
> I was adding the changelog offset of that record to the batch along with
> the record. This is different to the regular write-path, which only adds
> the current offsets once, on-commit. This writeOffset method is fairly
> expensive, since it has to serialize the TopicPartition and offset that's
> being written to the database.
>
> Assuming this is the cause, I've already pushed a fix to my branch that
> will only call writeOffset once per-batch, and also adds some caching to
> the serialization in writeOffset, that should also enhance performance of
> state commit in the normal write-path.
>
> Please let me know if this addresses the issue!
>
> Regards,
> Nick
>
>
> On Mon, 11 Sept 2023 at 05:38, Colt McNealy <c...@littlehorse.io> wrote:
>
> > Howdy folks,
> >
> > First I wanted to say fantastic work and thank you to Nick. I built your
> > branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did
> > some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0`
> > branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1.
> And
> > it worked! Including the global store (we don't have any segmented
> stores,
> > unfortunately).
> >
> > The test I ran involved running 3,000 workflows with 100 tasks each, and
> > roughly 650MB state total.
> >
> > With Streams 3.5.0, I indeed verified that unclean shutdown caused a
> fresh
> > restore from scratch. I also benchmarked my application at:
> > - Running the benchmark took 211 seconds
> > - 1,421 tasks per second on one partition
> > - 8 seconds to restore the state (650MB or so)
> >
> > With KIP 892, I verified that unclean shutdown does not cause a fresh
> > restore (!!!!). I got the following benchmark results:
> > - Benchmark took 216 seconds
> > - 1,401 tasks per second on one partition
> > - 11 seconds to restore the state
> >
> > I ran the restorations many times to ensure that there was no rounding
> > error or noise; the results were remarkably consistent. Additionally, I
> ran
> > the restorations with KIP-892 built with Speedb OSS. The restoration time
> > consistently came out as 10 seconds, which was an improvement from the 11
> > seconds observed with RocksDB + KIP-892.
> >
> > My application is bottlenecked mostly by serialization and
> deserialization,
> > so improving the performance of the state store doesn't really impact our
> > throughput that much. And the processing performance (benchmark time,
> > tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at
> > larger state store sizes, RocksDB performance begins to degrade, so that
> > might not be true once we pass 20GB per partition.
> >
> > -- QUESTION: Because we observed a significant (30% or so) and
> reproducible
> > slowdown during restoration, it seems like KIP-892 uses the checkpointing
> > behavior during restoration as well? If so, I would argue that this might
> > not be necessary, because everything we write is already committed, so we
> > don't need to change the behavior during restoration or standby tasks.
> > Perhaps we could write the offsets to RocksDB on every batch (or even
> every
> > 5 seconds or so).
> >
> > -- Note: This was a very small-scale test, with <1GB of state (as I
> didn't
> > have time to spend hours building up state). In the past I have noted
> that
> > RocksDB performance degrades significantly after 25GB of state in one
> > store. Future work involves determining the performance impact of KIP-892
> > relative to trunk at larger scale, since it's possible that the relative
> > behaviors are far different (i.e. relative to trunk, 892's processing and
> > restoration throughput might be much better or much worse).
> >
> > -- Note: For those who want to replicate the tests, you can find the
> branch
> > of our streams app here:
> >
> >
> https://github.com/littlehorse-enterprises/littlehorse/tree/minor/testing-streams-forks
> > . The example I ran was `examples/hundred-tasks`, and I ran the server
> with
> > `./local-dev/do-server.sh one-partition`. The `STREAMS_TESTS.md` file
> has a
> > detailed breakdown of the testing.
> >
> > Anyways, I'm super excited about this KIP and if a bit more future
> testing
> > goes well, we plan to ship our product with a build of KIP-892, Speedb
> OSS,
> > and potentially a few other minor tweaks that we are thinking about.
> >
> > Thanks Nick!
> >
> > Ride well,
> > Colt McNealy
> >
> > *Founder, LittleHorse.dev*
> >
> >
> > On Thu, Aug 24, 2023 at 3:19 AM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> >
> > > Hi Bruno,
> > >
> > > Thanks for taking the time to review the KIP. I'm back from leave now
> and
> > > intend to move this forwards as quickly as I can.
> > >
> > > Addressing your points:
> > >
> > > 1.
> > > Because flush() is part of the StateStore API, it's exposed to custom
> > > Processors, which might be making calls to flush(). This was actually
> the
> > > case in a few integration tests.
> > > To maintain as much compatibility as possible, I'd prefer not to make
> > this
> > > an UnsupportedOperationException, as it will cause previously working
> > > Processors to start throwing exceptions at runtime.
> > > I agree that it doesn't make sense for it to proxy commit(), though, as
> > > that would cause it to violate the "StateStores commit only when the
> Task
> > > commits" rule.
> > > Instead, I think we should make this a no-op. That way, existing user
> > > Processors will continue to work as-before, without violation of store
> > > consistency that would be caused by premature flush/commit of
> StateStore
> > > data to disk.
> > > What do you think?
> > >
> > > 2.
> > > As stated in the JavaDoc, when a StateStore implementation is
> > > transactional, but is unable to estimate the uncommitted memory usage,
> > the
> > > method will return -1.
> > > The intention here is to permit third-party implementations that may
> not
> > be
> > > able to estimate memory usage.
> > >
> > > Yes, it will be 0 when nothing has been written to the store yet. I
> > thought
> > > that was implied by "This method will return an approximation of the
> > memory
> > > would be freed by the next call to {@link #commit(Map)}" and "@return
> The
> > > approximate size of all records awaiting {@link #commit(Map)}",
> however,
> > I
> > > can add it explicitly to the JavaDoc if you think this is unclear?
> > >
> > > 3.
> > > I realise this is probably the most contentious point in my design, and
> > I'm
> > > open to changing it if I'm unable to convince you of the benefits.
> > > Nevertheless, here's my argument:
> > > The Interactive Query (IQ) API(s) are directly provided StateStores to
> > > query, and it may be important for users to programmatically know which
> > > mode the StateStore is operating under. If we simply provide an
> > > "eosEnabled" boolean (as used throughout the internal streams engine),
> or
> > > similar, then users will need to understand the operation and
> > consequences
> > > of each available processing mode and how it pertains to their
> > StateStore.
> > >
> > > Interactive Query users aren't the only people that care about the
> > > processing.mode/IsolationLevel of a StateStore: implementers of custom
> > > StateStores also need to understand the behaviour expected of their
> > > implementation. KIP-892 introduces some assumptions into the Streams
> > Engine
> > > about how StateStores operate under each processing mode, and it's
> > > important that custom implementations adhere to those assumptions in
> > order
> > > to maintain the consistency guarantees.
> > >
> > > IsolationLevels provide a high-level contract on the behaviour of the
> > > StateStore: a user knows that under READ_COMMITTED, they will see
> writes
> > > only after the Task has committed, and under READ_UNCOMMITTED they will
> > see
> > > writes immediately. No understanding of the details of each
> > processing.mode
> > > is required, either for IQ users or StateStore implementers.
> > >
> > > An argument can be made that these contractual guarantees can simply be
> > > documented for the processing.mode (i.e. that exactly-once and
> > > exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves
> like
> > > READ_UNCOMMITTED), but there are several small issues with this I'd
> > prefer
> > > to avoid:
> > >
> > >    - Where would we document these contracts, in a way that is
> difficult
> > >    for users/implementers to miss/ignore?
> > >    - It's not clear to users that the processing mode is communicating
> > >    an expectation of read isolation, unless they read the
> documentation.
> > > Users
> > >    rarely consult documentation unless they feel they need to, so it's
> > > likely
> > >    this detail would get missed by many users.
> > >    - It tightly couples processing modes to read isolation. Adding new
> > >    processing modes, or changing the read isolation of existing
> > processing
> > >    modes would be difficult/impossible.
> > >
> > > Ultimately, the cost of introducing IsolationLevels is just a single
> > > method, since we re-use the existing IsolationLevel enum from Kafka.
> This
> > > gives us a clear place to document the contractual guarantees expected
> > > of/provided by StateStores, that is accessible both by the StateStore
> > > itself, and by IQ users.
> > >
> > > (Writing this I've just realised that the StateStore and IQ APIs
> actually
> > > don't provide access to StateStoreContext that IQ users would have
> direct
> > > access to... Perhaps StateStore should expose isolationLevel() itself
> > too?)
> > >
> > > 4.
> > > Yeah, I'm not comfortable renaming the metrics in-place either, as
> it's a
> > > backwards incompatible change. My concern is that, if we leave the
> > existing
> > > "flush" metrics in place, they will be confusing to users. Right now,
> > > "flush" metrics record explicit flushes to disk, but under KIP-892,
> even
> > a
> > > commit() will not explicitly flush data to disk - RocksDB will decide
> on
> > > when to flush memtables to disk itself.
> > >
> > > If we keep the existing "flush" metrics, we'd have two options, which
> > both
> > > seem pretty bad to me:
> > >
> > >    1. Have them record calls to commit(), which would be misleading, as
> > >    data is no longer explicitly "flushed" to disk by this call.
> > >    2. Have them record nothing at all, which is equivalent to removing
> > the
> > >    metrics, except that users will see the metric still exists and so
> > > assume
> > >    that the metric is correct, and that there's a problem with their
> > system
> > >    when there isn't.
> > >
> > > I agree that removing them is also a bad solution, and I'd like some
> > > guidance on the best path forward here.
> > >
> > > 5.
> > > Position files are updated on every write to a StateStore. Since our
> > writes
> > > are now buffered until commit(), we can't update the Position file
> until
> > > commit() has been called, otherwise it would be inconsistent with the
> > data
> > > in the event of a rollback. Consequently, we need to manage these
> offsets
> > > the same way we manage the checkpoint offsets, and ensure they're only
> > > written on commit().
> > >
> > > 6.
> > > Agreed, although I'm not exactly sure yet what tests to write. How
> > explicit
> > > do we need to be here in the KIP?
> > >
> > > As for upgrade/downgrade: upgrade is designed to be seamless, and we
> > should
> > > definitely add some tests around that. Downgrade, it transpires, isn't
> > > currently possible, as the extra column family for offset storage is
> > > incompatible with the pre-KIP-892 implementation: when you open a
> RocksDB
> > > database, you must open all available column families or receive an
> > error.
> > > What currently happens on downgrade is that it attempts to open the
> > store,
> > > throws an error about the offsets column family not being opened, which
> > > triggers a wipe and rebuild of the Task. Given that downgrades should
> be
> > > uncommon, I think this is acceptable behaviour, as the end-state is
> > > consistent, even if it results in an undesirable state restore.
> > >
> > > Should I document the upgrade/downgrade behaviour explicitly in the
> KIP?
> > >
> > > --
> > >
> > > Regards,
> > > Nick
> > >
> > >
> > > On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <cado...@apache.org>
> wrote:
> > >
> > > > Hi Nick!
> > > >
> > > > Thanks for the updates!
> > > >
> > > > 1.
> > > > Why does StateStore#flush() default to
> > > > StateStore#commit(Collections.emptyMap())?
> > > > Since calls to flush() will not exist anymore after this KIP is
> > > > released, I would rather throw an unsupported operation exception by
> > > > default.
> > > >
> > > >
> > > > 2.
> > > > When would a state store return -1 from
> > > > StateStore#approximateNumUncommittedBytes() while being
> transactional?
> > > >
> > > > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
> > > > the state store is transactional but nothing has been written to the
> > > > state store yet?
> > > >
> > > >
> > > > 3.
> > > > Sorry for bringing this up again. Does this KIP really need to
> > introduce
> > > > StateStoreContext#isolationLevel()? StateStoreContext has already
> > > > appConfigs() which basically exposes the same information, i.e., if
> EOS
> > > > is enabled or not.
> > > > In one of your previous e-mails you wrote:
> > > >
> > > > "My idea was to try to keep the StateStore interface as loosely
> coupled
> > > > from the Streams engine as possible, to give implementers more
> freedom,
> > > > and reduce the amount of internal knowledge required."
> > > >
> > > > While I understand the intent, I doubt that it decreases the coupling
> > of
> > > > a StateStore interface and the Streams engine. READ_COMMITTED only
> > > > applies to IQ but not to reads by processors. Thus, implementers need
> > to
> > > > understand how Streams accesses the state stores.
> > > >
> > > > I would like to hear what others think about this.
> > > >
> > > >
> > > > 4.
> > > > Great exposing new metrics for transactional state stores! However, I
> > > > would prefer to add new metrics and deprecate (in the docs) the old
> > > > ones. You can find examples of deprecated metrics here:
> > > > https://kafka.apache.org/documentation/#selector_monitoring
> > > >
> > > >
> > > > 5.
> > > > Why does the KIP mention position files? I do not think they are
> > related
> > > > to transactions or flushes.
> > > >
> > > >
> > > > 6.
> > > > I think we will also need to adapt/add integration tests besides unit
> > > > tests. Additionally, we probably need integration or system tests to
> > > > verify that upgrades and downgrades between transactional and
> > > > non-transactional state stores work as expected.
> > > >
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 7/21/23 10:34 PM, Nick Telford wrote:
> > > > > One more thing: I noted John's suggestion in the KIP, under
> "Rejected
> > > > > Alternatives". I still think it's an idea worth pursuing, but I
> > believe
> > > > > that it's out of the scope of this KIP, because it solves a
> different
> > > set
> > > > > of problems to this KIP, and the scope of this one has already
> grown
> > > > quite
> > > > > large!
> > > > >
> > > > > On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > >> Hi everyone,
> > > > >>
> > > > >> I've updated the KIP (
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > )
> > > > >> with the latest changes; mostly bringing back "Atomic
> Checkpointing"
> > > > (for
> > > > >> what feels like the 10th time!). I think the one thing missing is
> > some
> > > > >> changes to metrics (notably the store "flush" metrics will need to
> > be
> > > > >> renamed to "commit").
> > > > >>
> > > > >> The reason I brought back Atomic Checkpointing was to decouple
> store
> > > > flush
> > > > >> from store commit. This is important, because with Transactional
> > > > >> StateStores, we now need to call "flush" on *every* Task commit,
> and
> > > not
> > > > >> just when the StateStore is closing, otherwise our transaction
> > buffer
> > > > will
> > > > >> never be written and persisted, instead growing unbounded! I
> > > > experimented
> > > > >> with some simple solutions, like forcing a store flush whenever
> the
> > > > >> transaction buffer was likely to exceed its configured size, but
> > this
> > > > was
> > > > >> brittle: it prevented the transaction buffer from being configured
> > to
> > > be
> > > > >> unbounded, and it still would have required explicit flushes of
> > > RocksDB,
> > > > >> yielding sub-optimal performance and memory utilization.
> > > > >>
> > > > >> I deemed Atomic Checkpointing to be the "right" way to resolve
> this
> > > > >> problem. By ensuring that the changelog offsets that correspond to
> > the
> > > > most
> > > > >> recently written records are always atomically written to the
> > > StateStore
> > > > >> (by writing them to the same transaction buffer), we can avoid
> > > forcibly
> > > > >> flushing the RocksDB memtables to disk, letting RocksDB flush them
> > > only
> > > > >> when necessary, without losing any of our consistency guarantees.
> > See
> > > > the
> > > > >> updated KIP for more info.
> > > > >>
> > > > >> I have fully implemented these changes, although I'm still not
> > > entirely
> > > > >> happy with the implementation for segmented StateStores, so I plan
> > to
> > > > >> refactor that. Despite that, all tests pass. If you'd like to try
> > out
> > > or
> > > > >> review this highly experimental and incomplete branch, it's
> > available
> > > > here:
> > > > >> https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note:
> it's
> > > > built
> > > > >> against Kafka 3.5.0 so that I had a stable base to build and test
> it
> > > on,
> > > > >> and to enable easy apples-to-apples comparisons in a live
> > > environment. I
> > > > >> plan to rebase it against trunk once it's nearer completion and
> has
> > > been
> > > > >> proven on our main application.
> > > > >>
> > > > >> I would really appreciate help in reviewing and testing:
> > > > >> - Segmented (Versioned, Session and Window) stores
> > > > >> - Global stores
> > > > >>
> > > > >> As I do not currently use either of these, so my primary test
> > > > environment
> > > > >> doesn't test these areas.
> > > > >>
> > > > >> I'm going on Parental Leave starting next week for a few weeks, so
> > > will
> > > > >> not have time to move this forward until late August. That said,
> > your
> > > > >> feedback is welcome and appreciated, I just won't be able to
> respond
> > > as
> > > > >> quickly as usual.
> > > > >>
> > > > >> Regards,
> > > > >> Nick
> > > > >>
> > > > >> On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >>> Hi Bruno
> > > > >>>
> > > > >>> Yes, that's correct, although the impact on IQ is not something I
> > had
> > > > >>> considered.
> > > > >>>
> > > > >>> What about atomically updating the state store from the
> transaction
> > > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > > flushing
> > > > >>>> the memtable) every configured amount of data and/or number of
> > > commit
> > > > >>>> intervals?
> > > > >>>>
> > > > >>>
> > > > >>> I'm not quite sure I follow. Are you suggesting that we add an
> > > > additional
> > > > >>> config for the max number of commit intervals between
> checkpoints?
> > > That
> > > > >>> way, we would checkpoint *either* when the transaction buffers
> are
> > > > nearly
> > > > >>> full, *OR* whenever a certain number of commit intervals have
> > > elapsed,
> > > > >>> whichever comes first?
> > > > >>>
> > > > >>> That certainly seems reasonable, although this re-ignites an
> > earlier
> > > > >>> debate about whether a config should be measured in "number of
> > commit
> > > > >>> intervals", instead of just an absolute time.
> > > > >>>
> > > > >>> FWIW, I realised that this issue is the reason I was pursuing the
> > > > Atomic
> > > > >>> Checkpoints, as it de-couples memtable flush from checkpointing,
> > > which
> > > > >>> enables us to just checkpoint on every commit without any
> > performance
> > > > >>> impact. Atomic Checkpointing is definitely the "best" solution,
> but
> > > > I'm not
> > > > >>> sure if this is enough to bring it back into this KIP.
> > > > >>>
> > > > >>> I'm currently working on moving all the transactional logic
> > directly
> > > > into
> > > > >>> RocksDBStore itself, which does away with the
> > > StateStore#newTransaction
> > > > >>> method, and reduces the number of new classes introduced,
> > > significantly
> > > > >>> reducing the complexity. If it works, and the complexity is
> > > drastically
> > > > >>> reduced, I may try bringing back Atomic Checkpoints into this
> KIP.
> > > > >>>
> > > > >>> Regards,
> > > > >>> Nick
> > > > >>>
> > > > >>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org>
> > > wrote:
> > > > >>>
> > > > >>>> Hi Nick,
> > > > >>>>
> > > > >>>> Thanks for the insights! Very interesting!
> > > > >>>>
> > > > >>>> As far as I understand, you want to atomically update the state
> > > store
> > > > >>>> from the transaction buffer, flush the memtable of a state store
> > and
> > > > >>>> write the checkpoint not after the commit time elapsed but after
> > the
> > > > >>>> transaction buffer reached a size that would lead to exceeding
> > > > >>>> statestore.transaction.buffer.max.bytes before the next commit
> > > > interval
> > > > >>>> ends.
> > > > >>>> That means, the Kafka transaction would commit every commit
> > interval
> > > > but
> > > > >>>> the state store will only be atomically updated roughly every
> > > > >>>> statestore.transaction.buffer.max.bytes of data. Also IQ would
> > then
> > > > only
> > > > >>>> see new data roughly every
> > statestore.transaction.buffer.max.bytes.
> > > > >>>> After a failure the state store needs to restore up to
> > > > >>>> statestore.transaction.buffer.max.bytes.
> > > > >>>>
> > > > >>>> Is this correct?
> > > > >>>>
> > > > >>>> What about atomically updating the state store from the
> > transaction
> > > > >>>> buffer every commit interval and writing the checkpoint (thus,
> > > > flushing
> > > > >>>> the memtable) every configured amount of data and/or number of
> > > commit
> > > > >>>> intervals? In such a way, we would have the same delay for
> records
> > > > >>>> appearing in output topics and IQ because both would appear when
> > the
> > > > >>>> Kafka transaction is committed. However, after a failure the
> state
> > > > store
> > > > >>>> still needs to restore up to
> > statestore.transaction.buffer.max.bytes
> > > > and
> > > > >>>> it might restore data that is already in the state store because
> > the
> > > > >>>> checkpoint lags behind the last stable offset (i.e. the last
> > > committed
> > > > >>>> offset) of the changelog topics. Restoring data that is already
> in
> > > the
> > > > >>>> state store is idempotent, so eos should not violated.
> > > > >>>> This solution needs at least one new config to specify when a
> > > > checkpoint
> > > > >>>> should be written.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> A small correction to your previous e-mail that does not change
> > > > anything
> > > > >>>> you said: Under alos the default commit interval is 30 seconds,
> > not
> > > > five
> > > > >>>> seconds.
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Bruno
> > > > >>>>
> > > > >>>>
> > > > >>>> On 01.07.23 12:37, Nick Telford wrote:
> > > > >>>>> Hi everyone,
> > > > >>>>>
> > > > >>>>> I've begun performance testing my branch on our staging
> > > environment,
> > > > >>>>> putting it through its paces in our non-trivial application.
> I'm
> > > > >>>> already
> > > > >>>>> observing the same increased flush rate that we saw the last
> time
> > > we
> > > > >>>>> attempted to use a version of this KIP, but this time, I think
> I
> > > know
> > > > >>>> why.
> > > > >>>>>
> > > > >>>>> Pre-KIP-892, StreamTask#postCommit, which is called at the end
> of
> > > the
> > > > >>>> Task
> > > > >>>>> commit process, has the following behaviour:
> > > > >>>>>
> > > > >>>>>      - Under ALOS: checkpoint the state stores. This includes
> > > > >>>>>      flushing memtables in RocksDB. This is acceptable because
> > the
> > > > >>>> default
> > > > >>>>>      commit.interval.ms is 5 seconds, so forcibly flushing
> > > memtables
> > > > >>>> every 5
> > > > >>>>>      seconds is acceptable for most applications.
> > > > >>>>>      - Under EOS: checkpointing is not done, *unless* it's
> being
> > > > >>>> forced, due
> > > > >>>>>      to e.g. the Task closing or being revoked. This means that
> > > under
> > > > >>>> normal
> > > > >>>>>      processing conditions, the state stores will not be
> > > > checkpointed,
> > > > >>>> and will
> > > > >>>>>      not have memtables flushed at all , unless RocksDB decides
> > to
> > > > >>>> flush them on
> > > > >>>>>      its own. Checkpointing stores and force-flushing their
> > > memtables
> > > > >>>> is only
> > > > >>>>>      done when a Task is being closed.
> > > > >>>>>
> > > > >>>>> Under EOS, KIP-892 needs to checkpoint stores on at least
> *some*
> > > > normal
> > > > >>>>> Task commits, in order to write the RocksDB transaction buffers
> > to
> > > > the
> > > > >>>>> state stores, and to ensure the offsets are synced to disk to
> > > prevent
> > > > >>>>> restores from getting out of hand. Consequently, my current
> > > > >>>> implementation
> > > > >>>>> calls maybeCheckpoint on *every* Task commit, which is far too
> > > > >>>> frequent.
> > > > >>>>> This causes checkpoints every 10,000 records, which is a change
> > in
> > > > >>>> flush
> > > > >>>>> behaviour, potentially causing performance problems for some
> > > > >>>> applications.
> > > > >>>>>
> > > > >>>>> I'm looking into possible solutions, and I'm currently leaning
> > > > towards
> > > > >>>>> using the statestore.transaction.buffer.max.bytes configuration
> > to
> > > > >>>>> checkpoint Tasks once we are likely to exceed it. This would
> > > > >>>> complement the
> > > > >>>>> existing "early Task commit" functionality that this
> > configuration
> > > > >>>>> provides, in the following way:
> > > > >>>>>
> > > > >>>>>      - Currently, we use
> statestore.transaction.buffer.max.bytes
> > to
> > > > >>>> force an
> > > > >>>>>      early Task commit if processing more records would cause
> our
> > > > state
> > > > >>>> store
> > > > >>>>>      transactions to exceed the memory assigned to them.
> > > > >>>>>      - New functionality: when a Task *does* commit, we will
> not
> > > > >>>> checkpoint
> > > > >>>>>      the stores (and hence flush the transaction buffers)
> unless
> > we
> > > > >>>> expect to
> > > > >>>>>      cross the statestore.transaction.buffer.max.bytes
> threshold
> > > > before
> > > > >>>> the next
> > > > >>>>>      commit
> > > > >>>>>
> > > > >>>>> I'm also open to suggestions.
> > > > >>>>>
> > > > >>>>> Regards,
> > > > >>>>> Nick
> > > > >>>>>
> > > > >>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> > nick.telf...@gmail.com
> > > >
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Bruno!
> > > > >>>>>>
> > > > >>>>>> 3.
> > > > >>>>>> By "less predictable for users", I meant in terms of
> > understanding
> > > > the
> > > > >>>>>> performance profile under various circumstances. The more
> > complex
> > > > the
> > > > >>>>>> solution, the more difficult it would be for users to
> understand
> > > the
> > > > >>>>>> performance they see. For example, spilling records to disk
> when
> > > the
> > > > >>>>>> transaction buffer reaches a threshold would, I expect, reduce
> > > write
> > > > >>>>>> throughput. This reduction in write throughput could be
> > > unexpected,
> > > > >>>> and
> > > > >>>>>> potentially difficult to diagnose/understand for users.
> > > > >>>>>> At the moment, I think the "early commit" concept is
> relatively
> > > > >>>>>> straightforward; it's easy to document, and conceptually
> fairly
> > > > >>>> obvious to
> > > > >>>>>> users. We could probably add a metric to make it easier to
> > > > understand
> > > > >>>> when
> > > > >>>>>> it happens though.
> > > > >>>>>>
> > > > >>>>>> 3. (the second one)
> > > > >>>>>> The IsolationLevel is *essentially* an indirect way of telling
> > > > >>>> StateStores
> > > > >>>>>> whether they should be transactional. READ_COMMITTED
> essentially
> > > > >>>> requires
> > > > >>>>>> transactions, because it dictates that two threads calling
> > > > >>>>>> `newTransaction()` should not see writes from the other
> > > transaction
> > > > >>>> until
> > > > >>>>>> they have been committed. With READ_UNCOMMITTED, all bets are
> > off,
> > > > and
> > > > >>>>>> stores can allow threads to observe written records at any
> time,
> > > > >>>> which is
> > > > >>>>>> essentially "no transactions". That said, StateStores are free
> > to
> > > > >>>> implement
> > > > >>>>>> these guarantees however they can, which is a bit more relaxed
> > > than
> > > > >>>>>> dictating "you must use transactions". For example, with
> RocksDB
> > > we
> > > > >>>> would
> > > > >>>>>> implement these as READ_COMMITTED == WBWI-based
> "transactions",
> > > > >>>>>> READ_UNCOMMITTED == direct writes to the database. But with
> > other
> > > > >>>> storage
> > > > >>>>>> engines, it might be preferable to *always* use transactions,
> > even
> > > > >>>> when
> > > > >>>>>> unnecessary; or there may be storage engines that don't
> provide
> > > > >>>>>> transactions, but the isolation guarantees can be met using a
> > > > >>>> different
> > > > >>>>>> technique.
> > > > >>>>>> My idea was to try to keep the StateStore interface as loosely
> > > > coupled
> > > > >>>>>> from the Streams engine as possible, to give implementers more
> > > > >>>> freedom, and
> > > > >>>>>> reduce the amount of internal knowledge required.
> > > > >>>>>> That said, I understand that "IsolationLevel" might not be the
> > > right
> > > > >>>>>> abstraction, and we can always make it much more explicit if
> > > > >>>> required, e.g.
> > > > >>>>>> boolean transactional()
> > > > >>>>>>
> > > > >>>>>> 7-8.
> > > > >>>>>> I can make these changes either later today or tomorrow.
> > > > >>>>>>
> > > > >>>>>> Small update:
> > > > >>>>>> I've rebased my branch on trunk and fixed a bunch of issues
> that
> > > > >>>> needed
> > > > >>>>>> addressing. Currently, all the tests pass, which is promising,
> > but
> > > > it
> > > > >>>> will
> > > > >>>>>> need to undergo some performance testing. I haven't (yet)
> worked
> > > on
> > > > >>>>>> removing the `newTransaction()` stuff, but I would expect
> that,
> > > > >>>>>> behaviourally, it should make no difference. The branch is
> > > available
> > > > >>>> at
> > > > >>>>>> https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone
> > is
> > > > >>>>>> interested in taking an early look.
> > > > >>>>>>
> > > > >>>>>> Regards,
> > > > >>>>>> Nick
> > > > >>>>>>
> > > > >>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
> cado...@apache.org
> > >
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Nick,
> > > > >>>>>>>
> > > > >>>>>>> 1.
> > > > >>>>>>> Yeah, I agree with you. That was actually also my point. I
> > > > understood
> > > > >>>>>>> that John was proposing the ingestion path as a way to avoid
> > the
> > > > >>>> early
> > > > >>>>>>> commits. Probably, I misinterpreted the intent.
> > > > >>>>>>>
> > > > >>>>>>> 2.
> > > > >>>>>>> I agree with John here, that actually it is public API. My
> > > question
> > > > >>>> is
> > > > >>>>>>> how this usage pattern affects normal processing.
> > > > >>>>>>>
> > > > >>>>>>> 3.
> > > > >>>>>>> My concern is that checking for the size of the transaction
> > > buffer
> > > > >>>> and
> > > > >>>>>>> maybe triggering an early commit affects the whole processing
> > of
> > > > >>>> Kafka
> > > > >>>>>>> Streams. The transactionality of a state store is not
> confined
> > to
> > > > the
> > > > >>>>>>> state store itself, but spills over and changes the behavior
> of
> > > > other
> > > > >>>>>>> parts of the system. I agree with you that it is a decent
> > > > >>>> compromise. I
> > > > >>>>>>> just wanted to analyse the downsides and list the options to
> > > > overcome
> > > > >>>>>>> them. I also agree with you that all options seem quite heavy
> > > > >>>> compared
> > > > >>>>>>> with your KIP. I do not understand what you mean with "less
> > > > >>>> predictable
> > > > >>>>>>> for users", though.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I found the discussions about the alternatives really
> > > interesting.
> > > > >>>> But I
> > > > >>>>>>> also think that your plan sounds good and we should continue
> > with
> > > > it!
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Some comments on your reply to my e-mail on June 20th:
> > > > >>>>>>>
> > > > >>>>>>> 3.
> > > > >>>>>>> Ah, now, I understand the reasoning behind putting isolation
> > > level
> > > > in
> > > > >>>>>>> the state store context. Thanks! Should that also be a way to
> > > give
> > > > >>>> the
> > > > >>>>>>> the state store the opportunity to decide whether to turn on
> > > > >>>>>>> transactions or not?
> > > > >>>>>>> With my comment, I was more concerned about how do you know
> if
> > a
> > > > >>>>>>> checkpoint file needs to be written under EOS, if you do not
> > > have a
> > > > >>>> way
> > > > >>>>>>> to know if the state store is transactional or not. If a
> state
> > > > store
> > > > >>>> is
> > > > >>>>>>> transactional, the checkpoint file can be written during
> normal
> > > > >>>>>>> processing under EOS. If the state store is not
> transactional,
> > > the
> > > > >>>>>>> checkpoint file must not be written under EOS.
> > > > >>>>>>>
> > > > >>>>>>> 7.
> > > > >>>>>>> My point was about not only considering the bytes in memory
> in
> > > > config
> > > > >>>>>>> statestore.uncommitted.max.bytes, but also bytes that might
> be
> > > > >>>> spilled
> > > > >>>>>>> on disk. Basically, I was wondering whether you should remove
> > the
> > > > >>>>>>> "memory" in "Maximum number of memory bytes to be used to
> > > > >>>>>>> buffer uncommitted state-store records." My thinking was that
> > > even
> > > > >>>> if a
> > > > >>>>>>> state store spills uncommitted bytes to disk, limiting the
> > > overall
> > > > >>>> bytes
> > > > >>>>>>> might make sense. Thinking about it again and considering the
> > > > recent
> > > > >>>>>>> discussions, it does not make too much sense anymore.
> > > > >>>>>>> I like the name statestore.transaction.buffer.max.bytes that
> > you
> > > > >>>> proposed.
> > > > >>>>>>>
> > > > >>>>>>> 8.
> > > > >>>>>>> A high-level description (without implementation details) of
> > how
> > > > >>>> Kafka
> > > > >>>>>>> Streams will manage the commit of changelog transactions,
> state
> > > > store
> > > > >>>>>>> transactions and checkpointing would be great. Would be great
> > if
> > > > you
> > > > >>>>>>> could also add some sentences about the behavior in case of a
> > > > >>>> failure.
> > > > >>>>>>> For instance how does a transactional state store recover
> > after a
> > > > >>>>>>> failure or what happens with the transaction buffer, etc.
> (that
> > > is
> > > > >>>> what
> > > > >>>>>>> I meant by "fail-over" in point 9.)
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Bruno
> > > > >>>>>>>
> > > > >>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > > > >>>>>>>> Hi Bruno,
> > > > >>>>>>>>
> > > > >>>>>>>> 1.
> > > > >>>>>>>> Isn't this exactly the same issue that WriteBatchWithIndex
> > > > >>>> transactions
> > > > >>>>>>>> have, whereby exceeding (or likely to exceed) configured
> > memory
> > > > >>>> needs to
> > > > >>>>>>>> trigger an early commit?
> > > > >>>>>>>>
> > > > >>>>>>>> 2.
> > > > >>>>>>>> This is one of my big concerns. Ultimately, any approach
> based
> > > on
> > > > >>>>>>> cracking
> > > > >>>>>>>> open RocksDB internals and using it in ways it's not really
> > > > designed
> > > > >>>>>>> for is
> > > > >>>>>>>> likely to have some unforseen performance or consistency
> > issues.
> > > > >>>>>>>>
> > > > >>>>>>>> 3.
> > > > >>>>>>>> What's your motivation for removing these early commits?
> While
> > > not
> > > > >>>>>>> ideal, I
> > > > >>>>>>>> think they're a decent compromise to ensure consistency
> whilst
> > > > >>>>>>> maintaining
> > > > >>>>>>>> good and predictable performance.
> > > > >>>>>>>> All 3 of your suggested ideas seem *very* complicated, and
> > might
> > > > >>>>>>> actually
> > > > >>>>>>>> make behaviour less predictable for users as a consequence.
> > > > >>>>>>>>
> > > > >>>>>>>> I'm a bit concerned that the scope of this KIP is growing a
> > bit
> > > > out
> > > > >>>> of
> > > > >>>>>>>> control. While it's good to discuss ideas for future
> > > > improvements, I
> > > > >>>>>>> think
> > > > >>>>>>>> it's important to narrow the scope down to a design that
> > > achieves
> > > > >>>> the
> > > > >>>>>>> most
> > > > >>>>>>>> pressing objectives (constant sized restorations during
> dirty
> > > > >>>>>>>> close/unexpected errors). Any design that this KIP produces
> > can
> > > > >>>>>>> ultimately
> > > > >>>>>>>> be changed in the future, especially if the bulk of it is
> > > internal
> > > > >>>>>>>> behaviour.
> > > > >>>>>>>>
> > > > >>>>>>>> I'm going to spend some time next week trying to re-work the
> > > > >>>> original
> > > > >>>>>>>> WriteBatchWithIndex design to remove the newTransaction()
> > > method,
> > > > >>>> such
> > > > >>>>>>> that
> > > > >>>>>>>> it's just an implementation detail of RocksDBStore. That
> way,
> > if
> > > > we
> > > > >>>>>>> want to
> > > > >>>>>>>> replace WBWI with something in the future, like the SST file
> > > > >>>> management
> > > > >>>>>>>> outlined by John, then we can do so with little/no API
> > changes.
> > > > >>>>>>>>
> > > > >>>>>>>> Regards,
> > > > >>>>>>>>
> > > > >>>>>>>> Nick
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >
> > > >
> > >
> >
>

Reply via email to