> Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.

In this case, how would uncommitted records be read by joins?

On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote:

> Ah, sorry Nick,
>
> I just meant the regular heap based cache that we maintain in Streams. I
> see that it's not called "RecordCache" (my mistake).
>
> The actual cache is ThreadCache:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
>
> Here's the example of how we use the cache in KeyValueStore:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
>
> It's basically just an on-heap Map of records that have not yet been
> written to the changelog or flushed into the underlying store. It gets
> flushed when the total cache size exceeds `cache.max.bytes.buffering` or
> the `commit.interval.ms` elapses.
>
> Speaking of those configs, another benefit to this idea is that we would
> no longer need to trigger extra commits based on the size of the ongoing
> transaction. Instead, we'd just preserve the existing cache-flush
> behavior. Note that users can disable the cache, which would still be
> ok, I think. We wouldn't ingest the SST files on every record, but just
> append to them and only ingest them on commit, when we're already
> waiting for acks and a RocksDB commit.
>
> Thanks,
> -John
>
> On 6/20/23 14:09, Nick Telford wrote:
> > Hi John,
> >
> > By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
> > class called "RecordCache"...
> >
> > Cheers,
> >
> > Nick
> >
> > On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for picking this up again!
> >>
> >> I did have one new thought over the intervening months, which I'd like
> >> your take on.
> >>
> >> What if, instead of using the RocksDB atomic write primitive at all, we
> >> instead just:
> >> 1. disable memtables entirely
> >> 2. directly write the RecordCache into SST files when we flush
> >> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK
> >> from the changelog (see
> >>
> >>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> >> and
> >>
> >>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> >> and
> >>
> >>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> >> )
> >> 4. track the changelog offsets either in another CF or the same CF with
> >> a reserved key, either of which will make the changelog offset update
> >> atomic with the file ingestions
> >>
> >> I suspect this'll have a number of benefits:
> >> * writes to RocksDB will always be atomic
> >> * we don't fragment memory between the RecordCache and the memtables
> >> * RecordCache gives far higher performance than memtable for reads and
> >> writes
> >> * we don't need any new "transaction" concepts or memory bound configs
> >>
> >> What do you think?
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 10:51, Nick Telford wrote:
> >>> Hi Bruno,
> >>>
> >>> Thanks for reviewing the KIP. It's been a long road, I started working
> on
> >>> this more than a year ago, and most of the time in the last 6 months
> has
> >>> been spent on the "Atomic Checkpointing" stuff that's been benched, so
> >> some
> >>> of the reasoning behind some of my decisions have been lost, but I'll
> do
> >> my
> >>> best to reconstruct them.
> >>>
> >>> 1.
> >>> IIRC, this was the initial approach I tried. I don't remember the exact
> >>> reasons I changed it to use a separate "view" of the StateStore that
> >>> encapsulates the transaction, but I believe it had something to do with
> >>> concurrent access to the StateStore from Interactive Query threads.
> Reads
> >>> from interactive queries need to be isolated from the currently ongoing
> >>> transaction, both for consistency (so interactive queries don't observe
> >>> changes that are subsequently rolled-back), but also to prevent
> Iterators
> >>> opened by an interactive query from being closed and invalidated by the
> >>> StreamThread when it commits the transaction, which causes your
> >> interactive
> >>> queries to crash.
> >>>
> >>> Another reason I believe I implemented it this way was a separation of
> >>> concerns. Recall that newTransaction() originally created an object of
> >> type
> >>> Transaction, not StateStore. My intent was to improve the type-safety
> of
> >>> the API, in an effort to ensure Transactions weren't used incorrectly.
> >>> Unfortunately, this didn't pan out, but newTransaction() remained.
> >>>
> >>> Finally, this had the added benefit that implementations could easily
> add
> >>> support for transactions *without* re-writing their existing,
> >>> non-transactional implementation. I think this can be a benefit both
> for
> >>> implementers of custom StateStores, but also for anyone extending
> >>> RocksDbStore, as they can rely on the existing access methods working
> how
> >>> they expect them to.
> >>>
> >>> I'm not too happy with the way the current design has panned out, so
> I'm
> >>> open to ideas on how to improve it. Key to this is finding some way to
> >>> ensure that reads from Interactive Query threads are properly isolated
> >> from
> >>> the transaction, *without* the performance overhead of checking which
> >>> thread the method is being called from on every access.
> >>>
> >>> As for replacing flush() with commit() - I saw no reason to add this
> >>> complexity to the KIP, unless there was a need to add arguments to the
> >>> flush/commit method. This need arises with Atomic Checkpointing, but
> that
> >>> will be implemented separately, in a future KIP. Do you see a need for
> >> some
> >>> arguments to the flush/commit method that I've missed? Or were you
> simply
> >>> suggesting a rename?
> >>>
> >>> 2.
> >>> This is simply due to the practical reason that isolationLevel() is
> >> really
> >>> a proxy for checking if the app is under EOS. The application
> >> configuration
> >>> is not provided to the constructor of StateStores, but it *is* provided
> >> to
> >>> init(), via StateStoreContext. For this reason, it seemed somewhat
> >> natural
> >>> to add it to StateStoreContext. I think this makes sense, since the
> >>> IsolationLevel of all StateStores in an application *must* be the same,
> >> and
> >>> since those stores are all initialized with the same StateStoreContext,
> >> it
> >>> seems natural for that context to carry the desired IsolationLevel to
> >> use.
> >>>
> >>> 3.
> >>> Using IsolationLevel instead of just passing `boolean eosEnabled`, like
> >>> much of the internals was an attempt to logically de-couple the
> >> StateStore
> >>> API from the internals of Kafka Streams. Technically, StateStores don't
> >>> need to know/care what processing mode the KS app is using, all they
> need
> >>> to know is the isolation level expected of them.
> >>>
> >>> Having formal definitions for the expectations of the two required
> >>> IsolationLevels allow implementers to implement transactional stores
> >>> without having to dig through the internals of Kafka Streams and
> >> understand
> >>> exactly how they are used. The tight coupling between state stores and
> >>> internal behaviour has actually significantly hindered my progress on
> >> this
> >>> KIP, and encouraged me to avoid increasing this logical coupling as
> much
> >> as
> >>> possible.
> >>>
> >>> This also frees implementations to satisfy those requirements in any
> way
> >>> they choose. Transactions might not be the only/available approach to
> an
> >>> implementation, but they might have an alternative way to satisfy the
> >>> isolation requirements. I admit that this point is more about
> semantics,
> >>> but "transactional" would need to be formally defined in order for
> >>> implementers to provide a valid implementation, and these
> IsolationLevels
> >>> provide that formal definition.
> >>>
> >>> 4.
> >>> I can remove them. I added them only as I planned to include them in
> the
> >>> org.apache.kafka.streams.state package, as a recommended base
> >>> implementation for all StateStores, including those implemented by
> >> users. I
> >>> had assumed that anything in "public" packages, such as
> >>> org.apache.kafka.streams.state, should be included in a KIP. Is that
> >> wrong?
> >>>
> >>> 5.
> >>> RocksDB provides no way to measure the actual size of a
> >>> WriteBatch(WithIndex), so we're limited to tracking the sum total of
> the
> >>> size of keys + values that are written to the transaction. This
> obviously
> >>> under-estimates the actual memory usage, because WriteBatch no-doubt
> >>> includes some record overheads, and WriteBatchWithIndex has to maintain
> >> an
> >>> index.
> >>>
> >>> Ideally, we could trivially add a method upstream to
> WriteBatchInterface
> >>> that provides the exact size of the batch, but that would require an
> >>> upgrade of RocksDB, which won't happen soon. So for the time being,
> we're
> >>> stuck with an approximation, so I felt that the new method should
> reflect
> >>> that.
> >>>
> >>> Would you prefer the new method name ignores this constraint and that
> we
> >>> simply make the rocks measurement more accurate in the future?
> >>>
> >>> 6.
> >>> Done
> >>>
> >>> 7.
> >>> Very good point. The KIP already specifically calls out memory in the
> >>> documentation of the config: "Maximum number of memory bytes to be used
> >> to
> >>> buffer uncommitted state-store records." - did you have something else
> in
> >>> mind?
> >>>
> >>> Should we also make this clearer by renaming the config property
> itself?
> >>> Perhaps to something like statestore.transaction.buffer.max.bytes?
> >>>
> >>> 8.
> >>> OK, I can remove this. The intent here was to describe how Streams
> itself
> >>> will manage transaction roll-over etc. Presumably that means we also
> >> don't
> >>> need a description of how Streams will manage the commit of changelog
> >>> transactions, state store transactions and checkpointing?
> >>>
> >>> 9.
> >>> What do you mean by fail-over? Do you mean failing over an Active Task
> to
> >>> an instance already hosting a Standby Task?
> >>>
> >>> Thanks again and sorry for the essay of a response!
> >>>
> >>> Regards,
> >>> Nick
> >>>
> >>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org>
> wrote:
> >>>
> >>>> Hi Nick,
> >>>>
> >>>> Thanks for the updates!
> >>>>
> >>>> I really appreciate that you simplified the KIP by removing some
> >>>> aspects. As I have already told you, I think the removed aspects are
> >>>> also good ideas and we can discuss them on follow-up KIPs.
> >>>>
> >>>> Regarding the current KIP, I have the following feedback.
> >>>>
> >>>> 1.
> >>>> Is there a good reason to add method newTransaction() to the
> StateStore
> >>>> interface? As far as I understand, the idea is that users of a state
> >>>> store (transactional or not) call this method at start-up and after
> each
> >>>> commit. Since the call to newTransaction() is done in any case and I
> >>>> think it would simplify the caller code if we just start a new
> >>>> transaction after a commit in the implementation?
> >>>> As far as I understand, you plan to commit the transaction in the
> >>>> flush() method. I find the idea to replace flush() with commit()
> >>>> presented in KIP-844 an elegant solution.
> >>>>
> >>>> 2.
> >>>> Why is the method to query the isolation level added to the state
> store
> >>>> context?
> >>>>
> >>>> 3.
> >>>> Do we need all the isolation level definitions? I think it is good to
> >>>> know the guarantees of the transactionality of the state store.
> >>>> However, currently, Streams guarantees that there will only be one
> >>>> transaction that writes to the state store. Only the stream thread
> that
> >>>> executes the active task that owns the state store will write to the
> >>>> state store. I think it should be enough to know if the state store is
> >>>> transactional or not. So my proposal would be to just add a method on
> >>>> the state store interface the returns if a state store is
> transactional
> >>>> or not by returning a boolean or an enum.
> >>>>
> >>>> 4.
> >>>> I am wondering why AbstractTransaction and AbstractTransactionalStore
> >>>> are part of the KIP. They look like implementation details that should
> >>>> not be exposed in the public API.
> >>>>
> >>>> 5.
> >>>> Why does StateStore#approximateNumUncommittedBytes() return an
> >>>> approximate number of bytes?
> >>>>
> >>>> 6.
> >>>> RocksDB is just one implementation of the state stores in Streams.
> >>>> However, the issues regarding OOM errors might also apply to other
> >>>> custom implementations. So in the KIP I would extract that part from
> >>>> section "RocksDB Transaction". I would also move section "RocksDB
> >>>> Transaction" to the end of section "Proposed Changes" and handle it as
> >>>> an example implementation for a state store.
> >>>>
> >>>> 7.
> >>>> Should statestore.uncommitted.max.bytes only limit the uncommitted
> bytes
> >>>> or the uncommitted bytes that reside in memory? In future, other
> >>>> transactional state store implementations might implement a buffer for
> >>>> uncommitted records that are able to spill records on disk. I think
> >>>> statestore.uncommitted.max.bytes needs to limit the uncommitted bytes
> >>>> irrespective if they reside in memory or disk. Since Streams will use
> >>>> this config to decide if it needs to trigger a commit, state store
> >>>> implementations that can spill to disk will never be able to spill to
> >>>> disk. You would only need to change the doc of the config, if you
> agree
> >>>> with me.
> >>>>
> >>>> 8.
> >>>> Section "Transaction Management" about the wrappers is rather a
> >>>> implementation detail that should not be in the KIP.
> >>>>
> >>>> 9.
> >>>> Could you add a section that describes how failover will work with the
> >>>> transactional state stores? I think section "Error handling" is
> already
> >>>> a good start.
> >>>>
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On 15.05.23 11:04, Nick Telford wrote:
> >>>>> Hi everyone,
> >>>>>
> >>>>> Quick update: I've added a new section to the KIP: "Offsets for
> >> Consumer
> >>>>> Rebalances", that outlines my solution to the problem that
> >>>>> StreamsPartitionAssignor needs to read StateStore offsets even if
> >> they're
> >>>>> not currently open.
> >>>>>
> >>>>> Regards,
> >>>>> Nick
> >>>>>
> >>>>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Bruno,
> >>>>>>
> >>>>>> Thanks for reviewing my proposal.
> >>>>>>
> >>>>>> 1.
> >>>>>> The main reason I added it was because it was easy to do. If we see
> no
> >>>>>> value in it, I can remove it.
> >>>>>>
> >>>>>> 2.
> >>>>>> Global StateStores can have multiple partitions in their input
> topics
> >>>>>> (which function as their changelogs), so they would have more than
> one
> >>>>>> partition.
> >>>>>>
> >>>>>> 3.
> >>>>>> That's a good point. At present, the only method it adds is
> >>>>>> isolationLevel(), which is likely not necessary outside of
> >> StateStores.
> >>>>>> It *does* provide slightly different guarantees in the documentation
> >> to
> >>>>>> several of the methods (hence the overrides). I'm not sure if this
> is
> >>>>>> enough to warrant a new interface though.
> >>>>>> I think the question that remains is whether this interface makes it
> >>>>>> easier to implement custom transactional StateStores than if we were
> >> to
> >>>>>> remove it? Probably not.
> >>>>>>
> >>>>>> 4.
> >>>>>> The main motivation for the Atomic Checkpointing is actually
> >>>> performance.
> >>>>>> My team has been testing out an implementation of this KIP without
> it,
> >>>> and
> >>>>>> we had problems with RocksDB doing *much* more compaction, due to
> the
> >>>>>> significantly increased flush rate. It was enough of a problem that
> >> (for
> >>>>>> the time being), we had to revert back to Kafka Streams proper.
> >>>>>> I think the best way to solve this, as you say, is to keep the
> >>>> .checkpoint
> >>>>>> files *in addition* to the offsets being stored within the store
> >> itself.
> >>>>>> Essentially, when closing StateStores, we force a memtable flush,
> then
> >>>>>> call getCommittedOffsets and write those out to the .checkpoint
> file.
> >>>>>> That would ensure the metadata is available to the
> >>>>>> StreamsPartitionAssignor for all closed stores.
> >>>>>> If there's a crash (no clean close), then we won't be able to
> >> guarantee
> >>>>>> which offsets were flushed to disk by RocksDB, so we'd need to open
> (
> >>>>>> init()), read offsets, and then close() those stores. But since this
> >> is
> >>>>>> the exception, and will only occur once (provided it doesn't crash
> >> every
> >>>>>> time!), I think the performance impact here would be acceptable.
> >>>>>>
> >>>>>> Thanks for the feedback, please let me know if you have any more
> >>>> comments
> >>>>>> or questions!
> >>>>>>
> >>>>>> I'm currently working on rebasing against trunk. This involves
> adding
> >>>>>> support for transactionality to VersionedStateStores. I will
> probably
> >>>> need
> >>>>>> to revise my implementation for transactional "segmented" stores,
> both
> >>>> to
> >>>>>> accommodate VersionedStateStore, and to clean up some other stuff.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Nick
> >>>>>>
> >>>>>>
> >>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org>
> >> wrote:
> >>>>>>
> >>>>>>> Hi Nick,
> >>>>>>>
> >>>>>>> Thanks for the updates!
> >>>>>>>
> >>>>>>> I have a couple of questions/comments.
> >>>>>>>
> >>>>>>> 1.
> >>>>>>> Why do you propose a configuration that involves max. bytes and
> max.
> >>>>>>> reords? I think we are mainly concerned about memory consumption
> >>>> because
> >>>>>>> we want to limit the off-heap memory used. I cannot think of a case
> >>>>>>> where one would want to set the max. number of records.
> >>>>>>>
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> Why does
> >>>>>>>
> >>>>>>>      default void commit(final Map<TopicPartition, Long>
> >>>> changelogOffsets) {
> >>>>>>>          flush();
> >>>>>>>      }
> >>>>>>>
> >>>>>>> take a map of partitions to changelog offsets?
> >>>>>>> The mapping between state stores to partitions is a 1:1
> relationship.
> >>>>>>> Passing in a single changelog offset should suffice.
> >>>>>>>
> >>>>>>>
> >>>>>>> 3.
> >>>>>>> Why do we need the Transaction interface? It should be possible to
> >> hide
> >>>>>>> beginning and committing a transactions withing the state store
> >>>>>>> implementation, so that from outside the state store, it does not
> >>>> matter
> >>>>>>> whether the state store is transactional or not. What would be the
> >>>>>>> advantage of using the Transaction interface?
> >>>>>>>
> >>>>>>>
> >>>>>>> 4.
> >>>>>>> Regarding checkpointing offsets, I think we should keep the
> >> checkpoint
> >>>>>>> file in any case for the reason you mentioned about rebalancing.
> Even
> >>>> if
> >>>>>>> that would not be an issue, I would propose to move the change to
> >>>> offset
> >>>>>>> management to a new KIP and to not add more complexity than needed
> to
> >>>>>>> this one. I would not be too concerned about the consistency
> >> violation
> >>>>>>> you mention. As far as I understand, with transactional state
> stores
> >>>>>>> Streams would write the checkpoint file during every commit even
> >> under
> >>>>>>> EOS. In the failure case you describe, Streams would restore the
> >> state
> >>>>>>> stores from the offsets found in the checkpoint file written during
> >> the
> >>>>>>> penultimate commit instead of during the last commit. Basically,
> >>>> Streams
> >>>>>>> would overwrite the records written to the state store between the
> >> last
> >>>>>>> two commits with the same records read from the changelogs. While I
> >>>>>>> understand that this is wasteful, it is -- at the same time --
> >>>>>>> acceptable and most importantly it does not break EOS.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>>
> >>>>>>> On 27.04.23 12:34, Nick Telford wrote:
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> I find myself (again) considering removing the offset management
> >> from
> >>>>>>>> StateStores, and keeping the old checkpoint file system. The
> reason
> >> is
> >>>>>>> that
> >>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in
> order
> >>>> to
> >>>>>>>> determine which instance has the most up-to-date copy of the local
> >>>>>>> state.
> >>>>>>>> If we move offsets into the StateStore itself, then we will need
> to
> >>>>>>> open,
> >>>>>>>> initialize, read offsets and then close each StateStore (that is
> not
> >>>>>>>> already assigned and open) for which we have *any* local state, on
> >>>> every
> >>>>>>>> rebalance.
> >>>>>>>>
> >>>>>>>> Generally, I don't think there are many "orphan" stores like this
> >>>>>>> sitting
> >>>>>>>> around on most instances, but even a few would introduce
> additional
> >>>>>>> latency
> >>>>>>>> to an already somewhat lengthy rebalance procedure.
> >>>>>>>>
> >>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping
> things
> >>>> in
> >>>>>>> the
> >>>>>>>> checkpoint file(s) for now, and not worrying about the race. The
> >>>>>>> downside
> >>>>>>>> is that we wouldn't be able to remove the explicit RocksDB flush
> >>>>>>> on-commit,
> >>>>>>>> which likely hurts performance.
> >>>>>>>>
> >>>>>>>> If anyone has any thoughts or ideas on this subject, I would
> >>>> appreciate
> >>>>>>> it!
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Nick
> >>>>>>>>
> >>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <
> nick.telf...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Colt,
> >>>>>>>>>
> >>>>>>>>> The issue is that if there's a crash between 2 and 3, then you
> >> still
> >>>>>>> end
> >>>>>>>>> up with inconsistent data in RocksDB. The only way to guarantee
> >> that
> >>>>>>> your
> >>>>>>>>> checkpoint offsets and locally stored data are consistent with
> each
> >>>>>>> other
> >>>>>>>>> are to atomically commit them, which can be achieved by having
> the
> >>>>>>> offsets
> >>>>>>>>> stored in RocksDB.
> >>>>>>>>>
> >>>>>>>>> The offsets column family is likely to be extremely small (one
> >>>>>>>>> per-changelog partition + one per Topology input partition for
> >>>> regular
> >>>>>>>>> stores, one per input partition for global stores). So the
> overhead
> >>>>>>> will be
> >>>>>>>>> minimal.
> >>>>>>>>>
> >>>>>>>>> A major benefit of doing this is that we can remove the explicit
> >>>> calls
> >>>>>>> to
> >>>>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit.
> It
> >>>>>>> turns
> >>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka
> Streams
> >>>>>>>>> commits, *not* RocksDB configuration, which could be a major
> source
> >>>> of
> >>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these
> >>>> explicit
> >>>>>>>>> flushes, because it no longer matters exactly when RocksDB
> flushes
> >>>>>>> data to
> >>>>>>>>> disk; since the data and corresponding checkpoint offsets will
> >> always
> >>>>>>> be
> >>>>>>>>> flushed together, the local store is always in a consistent
> state,
> >>>> and
> >>>>>>>>> on-restart, it can always safely resume restoration from the
> >> on-disk
> >>>>>>>>> offsets, restoring the small amount of data that hadn't been
> >> flushed
> >>>>>>> when
> >>>>>>>>> the app exited/crashed.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Nick,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for your reply. Ack to A) and B).
> >>>>>>>>>>
> >>>>>>>>>> For item C), I see what you're referring to. Your proposed
> >> solution
> >>>>>>> will
> >>>>>>>>>> work, so no need to change it. What I was suggesting was that it
> >>>>>>> might be
> >>>>>>>>>> possible to achieve this with only one column family. So long
> as:
> >>>>>>>>>>
> >>>>>>>>>>        - No uncommitted records (i.e. not committed to the
> >> changelog)
> >>>> are
> >>>>>>>>>>        *committed* to the state store, AND
> >>>>>>>>>>        - The Checkpoint offset (which refers to the changelog
> >> topic)
> >>>> is
> >>>>>>> less
> >>>>>>>>>>        than or equal to the last written changelog offset in
> >> rocksdb
> >>>>>>>>>>
> >>>>>>>>>> I don't see the need to do the full restoration from scratch. My
> >>>>>>>>>> understanding was that prior to 844/892, full restorations were
> >>>>>>> required
> >>>>>>>>>> because there could be uncommitted records written to RocksDB;
> >>>>>>> however,
> >>>>>>>>>> given your use of RocksDB transactions, that can be avoided with
> >> the
> >>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB
> >>>>>>> transaction, 3)
> >>>>>>>>>> update offset in checkpoint file.
> >>>>>>>>>>
> >>>>>>>>>> Anyways, your proposed solution works equivalently and I don't
> >>>> believe
> >>>>>>>>>> there is much overhead to an additional column family in
> RocksDB.
> >>>>>>> Perhaps
> >>>>>>>>>> it may even perform better than making separate writes to the
> >>>>>>> checkpoint
> >>>>>>>>>> file.
> >>>>>>>>>>
> >>>>>>>>>> Colt McNealy
> >>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <
> >>>> nick.telf...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>
> >>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from the
> >>>> rest
> >>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>> Streams engine. The fact that there will be only one ongoing
> >>>> (write)
> >>>>>>>>>>> transaction at a time is not guaranteed by any API, and is
> just a
> >>>>>>>>>>> consequence of the way Streams operates. To that end, I tried
> to
> >>>>>>> ensure
> >>>>>>>>>> the
> >>>>>>>>>>> documentation and guarantees provided by the new APIs are
> >>>>>>> independent of
> >>>>>>>>>>> this incidental behaviour. In practice, you're right, this
> >>>>>>> essentially
> >>>>>>>>>>> refers to "interactive queries", which are technically "read
> >>>>>>>>>> transactions",
> >>>>>>>>>>> even if they don't actually use the transaction API to isolate
> >>>>>>>>>> themselves.
> >>>>>>>>>>>
> >>>>>>>>>>> B. Yes, although not ideal. This is for backwards
> compatibility,
> >>>>>>>>>> because:
> >>>>>>>>>>>         1) Existing custom StateStore implementations will
> >> implement
> >>>>>>>>>> flush(),
> >>>>>>>>>>> and not commit(), but the Streams engine now calls commit(), so
> >>>> those
> >>>>>>>>>> calls
> >>>>>>>>>>> need to be forwarded to flush() for these legacy stores.
> >>>>>>>>>>>         2) Existing StateStore *users*, i.e. outside of the
> >> Streams
> >>>>>>> engine
> >>>>>>>>>>> itself, may depend on explicitly calling flush(), so for these
> >>>> cases,
> >>>>>>>>>>> flush() needs to be redirected to call commit().
> >>>>>>>>>>> If anyone has a better way to guarantee compatibility without
> >>>>>>>>>> introducing
> >>>>>>>>>>> this potential recursion loop, I'm open to changes!
> >>>>>>>>>>>
> >>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section.
> >> Offsets
> >>>>>>> are
> >>>>>>>>>>> stored in a separate RocksDB column family, which is guaranteed
> >> to
> >>>> be
> >>>>>>>>>>> atomically flushed to disk with all other column families. The
> >>>> issue
> >>>>>>> of
> >>>>>>>>>>> checkpoints being written to disk after commit causing
> >>>> inconsistency
> >>>>>>> if
> >>>>>>>>>> it
> >>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint
> files
> >>>> are
> >>>>>>>>>> only
> >>>>>>>>>>> written on clean shutdown. This is one of the major causes of
> >> "full
> >>>>>>>>>>> restorations", so moving the offsets into a place where they
> can
> >> be
> >>>>>>>>>>> guaranteed to be atomically written with the data they
> checkpoint
> >>>>>>>>>> allows us
> >>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just on
> >>>> clean
> >>>>>>>>>>> shutdown.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Nick
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <
> c...@littlehorse.io>
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Nick,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for continuing this work. I have a few minor
> >> clarifying
> >>>>>>>>>>>> questions.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A) "Records written to any transaction are visible to all
> other
> >>>>>>>>>>>> transactions immediately." I am confused here—I thought there
> >>>> could
> >>>>>>>>>> only
> >>>>>>>>>>> be
> >>>>>>>>>>>> one transaction going on at a time for a given state store
> given
> >>>> the
> >>>>>>>>>>>> threading model for processing records on a Task. Do you mean
> >>>>>>>>>> Interactive
> >>>>>>>>>>>> Queries by "other transactions"? (If so, then everything makes
> >>>>>>> sense—I
> >>>>>>>>>>>> thought that since IQ were read-only then they didn't count as
> >>>>>>>>>>>> transactions).
> >>>>>>>>>>>>
> >>>>>>>>>>>> B) Is it intentional that the default implementations of the
> >>>> flush()
> >>>>>>>>>> and
> >>>>>>>>>>>> commit() methods in the StateStore class refer to each other
> in
> >>>> some
> >>>>>>>>>> sort
> >>>>>>>>>>>> of unbounded recursion?
> >>>>>>>>>>>>
> >>>>>>>>>>>> C) How will the getCommittedOffset() method work? At first I
> >>>> thought
> >>>>>>>>>> the
> >>>>>>>>>>>> way to do it would be using a special key in the RocksDB store
> >> to
> >>>>>>>>>> store
> >>>>>>>>>>> the
> >>>>>>>>>>>> offset, and committing that with the transaction. But upon
> >> second
> >>>>>>>>>>> thought,
> >>>>>>>>>>>> since restoration from the changelog is an idempotent
> >> procedure, I
> >>>>>>>>>> think
> >>>>>>>>>>> it
> >>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then 2)
> >>>> write
> >>>>>>>>>> the
> >>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash
> between
> >>>> 1)
> >>>>>>>>>> and
> >>>>>>>>>>> 2),
> >>>>>>>>>>>> I think the only downside is now we replay a few more records
> >> (at
> >>>> a
> >>>>>>>>>> cost
> >>>>>>>>>>> of
> >>>>>>>>>>>> <100ms). Am I missing something there?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Other than that, everything makes sense to me.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
> >>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the
> >> design:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> There are several changes in there that reflect feedback from
> >>>> this
> >>>>>>>>>>>> thread,
> >>>>>>>>>>>>> and there's a new section and a bunch of interface changes
> >>>> relating
> >>>>>>>>>> to
> >>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the puzzle
> to
> >>>>>>>>>> making
> >>>>>>>>>>>>> everything robust.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Let me know what you think!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <
> >>>> nick.telf...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Lucas,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for looking over my KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a typo
> in
> >>>> the
> >>>>>>>>>>> KIP
> >>>>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I
> >>>>>>>>>> changed it
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>> per-instance for exactly the reason you highlighted.
> >>>>>>>>>>>>>> B) It's worth noting that transactionality is only enabled
> >> under
> >>>>>>>>>> EOS,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be no
> >>>>>>>>>> change in
> >>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the
> >> impact
> >>>> on
> >>>>>>>>>>>> users
> >>>>>>>>>>>>> by
> >>>>>>>>>>>>>> sufficiently low default values for the memory bound
> >>>>>>>>>> configuration. I
> >>>>>>>>>>>>>> understand your hesitation to include a significant change
> of
> >>>>>>>>>>>> behaviour,
> >>>>>>>>>>>>>> especially in a minor release, but I suspect that most users
> >>>> will
> >>>>>>>>>>>> prefer
> >>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of
> >>>>>>>>>> frequent
> >>>>>>>>>>>> state
> >>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait
> until
> >>>> the
> >>>>>>>>>>> next
> >>>>>>>>>>>>>> major release. I'll be running a patched version of streams
> in
> >>>>>>>>>>>> production
> >>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't
> >> disrupt
> >>>>>>>>>> me
> >>>>>>>>>>> :-D
> >>>>>>>>>>>>>> C) The main purpose of this sentence was just to note that
> >> some
> >>>>>>>>>>> changes
> >>>>>>>>>>>>>> will need to be made to the way Segments are handled in
> order
> >> to
> >>>>>>>>>>> ensure
> >>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote
> it, I
> >>>>>>>>>> hadn't
> >>>>>>>>>>>>>> figured out the specific changes necessary, so it was
> >>>> deliberately
> >>>>>>>>>>>> vague.
> >>>>>>>>>>>>>> This is the one outstanding problem I'm currently working
> on,
> >>>> and
> >>>>>>>>>>> I'll
> >>>>>>>>>>>>>> update this section with more detail once I have figured out
> >> the
> >>>>>>>>>>> exact
> >>>>>>>>>>>>>> changes required.
> >>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation
> >> guarantees.
> >>>>>>>>>>> While
> >>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't
> technically
> >>>>>>>>>> *need*
> >>>>>>>>>>>>>> read-only users to call newTransaction(), other
> >> implementations
> >>>>>>>>>>> (e.g. a
> >>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling
> >>>>>>>>>> newTransaction()
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>> no transaction is necessary is essentially free, as it will
> >> just
> >>>>>>>>>>> return
> >>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it
> >>>>>>>>>> should
> >>>>>>>>>>> be
> >>>>>>>>>>>>>> fairly obvious where the performance problems stem from:
> >> writes
> >>>>>>>>>> under
> >>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with
> the
> >>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the
> >> tombstone/record
> >>>>>>>>>> flag,
> >>>>>>>>>>>>> and 1
> >>>>>>>>>>>>>> to copy the record from the "temporary" store to the "main"
> >>>> store,
> >>>>>>>>>>> when
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> transaction commits. The different approach taken by KIP-869
> >>>>>>>>>> should
> >>>>>>>>>>>>> perform
> >>>>>>>>>>>>>> much better, as it avoids all these copies, and may actually
> >>>>>>>>>> perform
> >>>>>>>>>>>>>> slightly better than trunk, due to batched writes in RocksDB
> >>>>>>>>>>> performing
> >>>>>>>>>>>>>> better than non-batched writes.[1]
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1:
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>
> >>>>
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
> >>>>>>>>>> lbruts...@confluent.io
> >>>>>>>>>>>>> .invalid>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion about
> >>>>>>>>>> KIP-892
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think
> >>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have
> >> some
> >>>>>>>>>>>>>>> questions about the latest draft.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      A) If I understand correctly, you propose to put a
> bound
> >> on
> >>>> the
> >>>>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder if
> >>>> this
> >>>>>>>>>> is
> >>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the cluster.
> >> For
> >>>>>>>>>>>>>>> example, depending on the timing of rebalances during a
> >> cluster
> >>>>>>>>>>>>>>> restart, it could happen that a single streams node is
> >>>> assigned a
> >>>>>>>>>>> lot
> >>>>>>>>>>>>>>> more tasks than expected. With your proposed change, this
> >> would
> >>>>>>>>>> mean
> >>>>>>>>>>>>>>> that the memory required by this one node could be a
> multiple
> >>>> of
> >>>>>>>>>>> what
> >>>>>>>>>>>>>>> is required during normal operation. I wonder if it
> wouldn't
> >> be
> >>>>>>>>>>> safer
> >>>>>>>>>>>>>>> to put a global bound on the memory use, across all tasks.
> >>>>>>>>>>>>>>>      B) Generally, the memory concerns still give me the
> >> feeling
> >>>>>>> that
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>> should not be enabled by default for all users in a minor
> >>>>>>>>>> release.
> >>>>>>>>>>>>>>>      C) In section "Transaction Management": the sentence
> "A
> >>>> similar
> >>>>>>>>>>>>>>> analogue will be created to automatically manage `Segment`
> >>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some
> >> background,
> >>>>>>>>>> but I
> >>>>>>>>>>>>>>> do not understand this, it would be great if you could
> >> clarify
> >>>>>>>>>> what
> >>>>>>>>>>>>>>> you mean here.
> >>>>>>>>>>>>>>>      D) Could you please clarify why IQ has to call
> >>>>>>> newTransaction(),
> >>>>>>>>>>> when
> >>>>>>>>>>>>>>> it's read-only.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if
> there
> >>>> is
> >>>>>>>>>> an
> >>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x
> >> slower
> >>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>> by providing a flame graph), that would be quite
> interesting.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
> >>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which
> >>>>>>>>>> reflects
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> implementation I've been working on:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This new design should address the outstanding points
> >> already
> >>>>>>>>>> made
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear or
> >> need
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>>> clarification.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident
> that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> remaining
> >>>>>>>>>>>>>>>> work (making Segments behave) will not impact the
> documented
> >>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <
> >>>> c...@littlehorse.io
> >>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping
> >> that,
> >>>>>>>>>>>> since
> >>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the
> >>>>>>>>>> documented
> >>>>>>>>>>>> API,
> >>>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release to
> >>>>>>>>>> make
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> change;
> >>>>>>>>>>>>>>>>> but given that it would be considered a major change, I
> >> like
> >>>>>>>>>>> your
> >>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
> >>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended
> to
> >>>>>>>>>> try
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> keep the
> >>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd
> >>>>>>>>>> have
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> wait for
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> major version release to land these changes.
> >>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of
> >>>>>>>>>> guarantee,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> typically longer commit intervals would be problematic
> >> when
> >>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>> "committed" records.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor surgery,
> >>>>>>>>>> but I
> >>>>>>>>>>>>>>> spent a
> >>>>>>>>>>>>>>>>>> considerable amount of that time working through ideas
> for
> >>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of keeping
> >>>>>>>>>> ALOS
> >>>>>>>>>>>>>>> as-is, but
> >>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards,
> >>>>>>>>>> although
> >>>>>>>>>>> I
> >>>>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for
> some
> >>>>>>>>>> more
> >>>>>>>>>>>>>>> formal
> >>>>>>>>>>>>>>>>>> guarantees.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for
> >>>>>>>>>>>> "Transactions",
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed"
> >>>>>>>>>> when
> >>>>>>>>>>>> under
> >>>>>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us
> >> much
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>> clearly
> >>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly
> >>>>>>>>>> documented to
> >>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>> know what to expect.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and will
> >>>>>>>>>> update
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to
> implement
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces or
> >>>>>>>>>>> breaking
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> implementations, because currently everything expects to
> >>>>>>>>>>> operate
> >>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. I
> >>>>>>>>>> think
> >>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>> getting
> >>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much
> >>>>>>>>>> until
> >>>>>>>>>>>> next
> >>>>>>>>>>>>>>> week
> >>>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>> to some work commitments.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
> >>>>>>>>>>> c...@littlehorse.io>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the updated
> >>>>>>>>>>> KIP. I
> >>>>>>>>>>>>> am
> >>>>>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>>> eager for this improvement to be released as it would
> >>>>>>>>>>> greatly
> >>>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Two questions:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 10)
> >>>>>>>>>>>>>>>>>>>> When reading records, we will use the
> >>>>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB
> >>>>>>>>>>>>>>>>>>>      and WriteBatchWithIndex#newIteratorWithBase
> >> utilities in
> >>>>>>>>>>>> order
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>> that uncommitted writes are available to query.
> >>>>>>>>>>>>>>>>>>> Why do extra work to enable the reading of uncommitted
> >>>>>>>>>>> writes
> >>>>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>>> IQ?
> >>>>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is,
> in
> >>>>>>>>>> my
> >>>>>>>>>>>>>>> opinion, a
> >>>>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the
> >>>>>>>>>>>>> guarantee
> >>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order to
> >>>>>>>>>> avoid
> >>>>>>>>>>>>> dirty
> >>>>>>>>>>>>>>>>> reads,
> >>>>>>>>>>>>>>>>>>> one currently must query a standby replica (but this
> >>>>>>>>>> still
> >>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>>>>>> guarantee monotonic reads).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization on
> >>>>>>>>>> ALOS
> >>>>>>>>>>>>>>> stores?
> >>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to
> >>>>>>>>>>> restore
> >>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was
> >>>>>>>>>>> acceptable
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk.
> On a
> >>>>>>>>>>> side
> >>>>>>>>>>>>>>> note, if
> >>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the
> >>>>>>>>>> community
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads;
> >>>>>>>>>> otherwise
> >>>>>>>>>>>>> users
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thank you for doing this fantastic work!
> >>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
> >>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no
> >>>>>>>>>>> longer
> >>>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be
> >>>>>>>>>> added
> >>>>>>>>>>>> as a
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>> to further optimize the consistency and performance of
> >>>>>>>>>>> state
> >>>>>>>>>>>>>>> stores.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've also added a section discussing some of the
> >>>>>>>>>> concerns
> >>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>>>> concurrency, especially in the presence of Iterators.
> >>>>>>>>>> I'm
> >>>>>>>>>>>>>>> thinking of
> >>>>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting
> >>>>>>>>>>>>>>> copy-on-write
> >>>>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an
> >>>>>>>>>>> active
> >>>>>>>>>>>>>>>>> iterator),
> >>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> I'm open to suggestions.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford <
> >>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844
> >>>>>>>>>> implementation:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>        - Writes uncommitted records to a temporary
> >>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>>>           - Since tombstones need to be flagged, all
> >>>>>>>>>> record
> >>>>>>>>>>>>>>> values are
> >>>>>>>>>>>>>>>>>>>>>           prefixed with a value/tombstone marker.
> This
> >>>>>>>>>>>>>>> necessitates a
> >>>>>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>>>> copy.
> >>>>>>>>>>>>>>>>>>>>>        - On-commit, iterates all records in this
> >>>>>>>>>> temporary
> >>>>>>>>>>>>>>> instance and
> >>>>>>>>>>>>>>>>>>>>>        writes them to the main RocksDB store.
> >>>>>>>>>>>>>>>>>>>>>        - While iterating, the value/tombstone marker
> >>>>>>>>>> needs
> >>>>>>>>>>> to
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>> parsed
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>        the real value extracted. This necessitates
> >>>>>>>>>> another
> >>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>> copy.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> My guess is that the cost of iterating the temporary
> >>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> major factor, with the 2 extra memory copies
> >>>>>>>>>> per-Record
> >>>>>>>>>>>>>>>>> contributing
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>> significant amount too.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
> >>>>>>>>>>>>>>> c...@littlehorse.io>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Out of curiosity, why does the performance of the
> >>>>>>>>>> store
> >>>>>>>>>>>>>>> degrade so
> >>>>>>>>>>>>>>>>>>>>>> significantly with the 844 implementation? I
> >>>>>>>>>> wouldn't
> >>>>>>>>>>> be
> >>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>> surprised
> >>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> 50-60% drop (caused by each record being written
> >>>>>>>>>>> twice),
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>> 96%
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> extreme.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The only thing I can think of which could create
> >>>>>>>>>> such a
> >>>>>>>>>>>>>>> bottleneck
> >>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and
> >>>>>>>>>>> then
> >>>>>>>>>>>>>>>>>>> re-serializes
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> store values when copying from the uncommitted to
> >>>>>>>>>>>> committed
> >>>>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the
> >>>>>>>>>> PR.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
> >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points
> >>>>>>>>>> that
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>> raised
> >>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit
> >>>>>>>>>>>> interval
> >>>>>>>>>>>>>>> of 5
> >>>>>>>>>>>>>>>>>>> minutes
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow
> >>>>>>>>>>> too
> >>>>>>>>>>>>>>> large.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> There's a couple of different things I can think
> >>>>>>>>>> of
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> solve
> >>>>>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>        - We already have a memory/record limit in
> the
> >>>>>>>>>> KIP
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>>>> OOM
> >>>>>>>>>>>>>>>>>>>>>>>        errors. Should we choose a default value for
> >>>>>>>>>>> these?
> >>>>>>>>>>>> My
> >>>>>>>>>>>>>>>>> concern
> >>>>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>        anything we choose might seem rather
> >>>>>>>>>> arbitrary. We
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>>>        its behaviour such that under ALOS, it only
> >>>>>>>>>>> triggers
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>        StateStore, but under EOS, it triggers a
> >>>>>>>>>> commit of
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>>>> transaction.
> >>>>>>>>>>>>>>>>>>>>>>>        - We could introduce a separate `
> >>>>>>>>>>>>> checkpoint.interval.ms`
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>>>>>>>>        to commit the StateStores more frequently
> than
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> general
> >>>>>>>>>>>>>>>>>>>>>>>        commit.interval.ms? My concern here is that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> semantics of
> >>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>> config
> >>

Reply via email to