Potentially we could just go the memorable with Rocks WriteBatches route if
the cache is disabled?

On Tue, 20 Jun 2023, 22:00 John Roesler, <j...@vvcephei.org> wrote:

> Touché!
>
> Ok, I agree that figuring out the case of a disabled cache would be
> non-trivial. Ingesting single-record SST files will probably not be
> performant, but benchmarking may prove different. Or maybe we can have
> some reserved cache space on top of the user-configured cache, which we
> would have reclaimed from the memtable space. Or some other, more
> creative solution.
>
> Thanks,
> -John
>
> On 6/20/23 15:30, Nick Telford wrote:
> >> Note that users can disable the cache, which would still be
> > ok, I think. We wouldn't ingest the SST files on every record, but just
> > append to them and only ingest them on commit, when we're already
> > waiting for acks and a RocksDB commit.
> >
> > In this case, how would uncommitted records be read by joins?
> >
> > On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote:
> >
> >> Ah, sorry Nick,
> >>
> >> I just meant the regular heap based cache that we maintain in Streams. I
> >> see that it's not called "RecordCache" (my mistake).
> >>
> >> The actual cache is ThreadCache:
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
> >>
> >> Here's the example of how we use the cache in KeyValueStore:
> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
> >>
> >> It's basically just an on-heap Map of records that have not yet been
> >> written to the changelog or flushed into the underlying store. It gets
> >> flushed when the total cache size exceeds `cache.max.bytes.buffering` or
> >> the `commit.interval.ms` elapses.
> >>
> >> Speaking of those configs, another benefit to this idea is that we would
> >> no longer need to trigger extra commits based on the size of the ongoing
> >> transaction. Instead, we'd just preserve the existing cache-flush
> >> behavior. Note that users can disable the cache, which would still be
> >> ok, I think. We wouldn't ingest the SST files on every record, but just
> >> append to them and only ingest them on commit, when we're already
> >> waiting for acks and a RocksDB commit.
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 14:09, Nick Telford wrote:
> >>> Hi John,
> >>>
> >>> By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find
> any
> >>> class called "RecordCache"...
> >>>
> >>> Cheers,
> >>>
> >>> Nick
> >>>
> >>> On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org>
> wrote:
> >>>
> >>>> Hi Nick,
> >>>>
> >>>> Thanks for picking this up again!
> >>>>
> >>>> I did have one new thought over the intervening months, which I'd like
> >>>> your take on.
> >>>>
> >>>> What if, instead of using the RocksDB atomic write primitive at all,
> we
> >>>> instead just:
> >>>> 1. disable memtables entirely
> >>>> 2. directly write the RecordCache into SST files when we flush
> >>>> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK
> >>>> from the changelog (see
> >>>>
> >>>>
> >>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> >>>> and
> >>>>
> >>>>
> >>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> >>>> and
> >>>>
> >>>>
> >>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> >>>> )
> >>>> 4. track the changelog offsets either in another CF or the same CF
> with
> >>>> a reserved key, either of which will make the changelog offset update
> >>>> atomic with the file ingestions
> >>>>
> >>>> I suspect this'll have a number of benefits:
> >>>> * writes to RocksDB will always be atomic
> >>>> * we don't fragment memory between the RecordCache and the memtables
> >>>> * RecordCache gives far higher performance than memtable for reads and
> >>>> writes
> >>>> * we don't need any new "transaction" concepts or memory bound configs
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On 6/20/23 10:51, Nick Telford wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Thanks for reviewing the KIP. It's been a long road, I started
> working
> >> on
> >>>>> this more than a year ago, and most of the time in the last 6 months
> >> has
> >>>>> been spent on the "Atomic Checkpointing" stuff that's been benched,
> so
> >>>> some
> >>>>> of the reasoning behind some of my decisions have been lost, but I'll
> >> do
> >>>> my
> >>>>> best to reconstruct them.
> >>>>>
> >>>>> 1.
> >>>>> IIRC, this was the initial approach I tried. I don't remember the
> exact
> >>>>> reasons I changed it to use a separate "view" of the StateStore that
> >>>>> encapsulates the transaction, but I believe it had something to do
> with
> >>>>> concurrent access to the StateStore from Interactive Query threads.
> >> Reads
> >>>>> from interactive queries need to be isolated from the currently
> ongoing
> >>>>> transaction, both for consistency (so interactive queries don't
> observe
> >>>>> changes that are subsequently rolled-back), but also to prevent
> >> Iterators
> >>>>> opened by an interactive query from being closed and invalidated by
> the
> >>>>> StreamThread when it commits the transaction, which causes your
> >>>> interactive
> >>>>> queries to crash.
> >>>>>
> >>>>> Another reason I believe I implemented it this way was a separation
> of
> >>>>> concerns. Recall that newTransaction() originally created an object
> of
> >>>> type
> >>>>> Transaction, not StateStore. My intent was to improve the type-safety
> >> of
> >>>>> the API, in an effort to ensure Transactions weren't used
> incorrectly.
> >>>>> Unfortunately, this didn't pan out, but newTransaction() remained.
> >>>>>
> >>>>> Finally, this had the added benefit that implementations could easily
> >> add
> >>>>> support for transactions *without* re-writing their existing,
> >>>>> non-transactional implementation. I think this can be a benefit both
> >> for
> >>>>> implementers of custom StateStores, but also for anyone extending
> >>>>> RocksDbStore, as they can rely on the existing access methods working
> >> how
> >>>>> they expect them to.
> >>>>>
> >>>>> I'm not too happy with the way the current design has panned out, so
> >> I'm
> >>>>> open to ideas on how to improve it. Key to this is finding some way
> to
> >>>>> ensure that reads from Interactive Query threads are properly
> isolated
> >>>> from
> >>>>> the transaction, *without* the performance overhead of checking which
> >>>>> thread the method is being called from on every access.
> >>>>>
> >>>>> As for replacing flush() with commit() - I saw no reason to add this
> >>>>> complexity to the KIP, unless there was a need to add arguments to
> the
> >>>>> flush/commit method. This need arises with Atomic Checkpointing, but
> >> that
> >>>>> will be implemented separately, in a future KIP. Do you see a need
> for
> >>>> some
> >>>>> arguments to the flush/commit method that I've missed? Or were you
> >> simply
> >>>>> suggesting a rename?
> >>>>>
> >>>>> 2.
> >>>>> This is simply due to the practical reason that isolationLevel() is
> >>>> really
> >>>>> a proxy for checking if the app is under EOS. The application
> >>>> configuration
> >>>>> is not provided to the constructor of StateStores, but it *is*
> provided
> >>>> to
> >>>>> init(), via StateStoreContext. For this reason, it seemed somewhat
> >>>> natural
> >>>>> to add it to StateStoreContext. I think this makes sense, since the
> >>>>> IsolationLevel of all StateStores in an application *must* be the
> same,
> >>>> and
> >>>>> since those stores are all initialized with the same
> StateStoreContext,
> >>>> it
> >>>>> seems natural for that context to carry the desired IsolationLevel to
> >>>> use.
> >>>>>
> >>>>> 3.
> >>>>> Using IsolationLevel instead of just passing `boolean eosEnabled`,
> like
> >>>>> much of the internals was an attempt to logically de-couple the
> >>>> StateStore
> >>>>> API from the internals of Kafka Streams. Technically, StateStores
> don't
> >>>>> need to know/care what processing mode the KS app is using, all they
> >> need
> >>>>> to know is the isolation level expected of them.
> >>>>>
> >>>>> Having formal definitions for the expectations of the two required
> >>>>> IsolationLevels allow implementers to implement transactional stores
> >>>>> without having to dig through the internals of Kafka Streams and
> >>>> understand
> >>>>> exactly how they are used. The tight coupling between state stores
> and
> >>>>> internal behaviour has actually significantly hindered my progress on
> >>>> this
> >>>>> KIP, and encouraged me to avoid increasing this logical coupling as
> >> much
> >>>> as
> >>>>> possible.
> >>>>>
> >>>>> This also frees implementations to satisfy those requirements in any
> >> way
> >>>>> they choose. Transactions might not be the only/available approach to
> >> an
> >>>>> implementation, but they might have an alternative way to satisfy the
> >>>>> isolation requirements. I admit that this point is more about
> >> semantics,
> >>>>> but "transactional" would need to be formally defined in order for
> >>>>> implementers to provide a valid implementation, and these
> >> IsolationLevels
> >>>>> provide that formal definition.
> >>>>>
> >>>>> 4.
> >>>>> I can remove them. I added them only as I planned to include them in
> >> the
> >>>>> org.apache.kafka.streams.state package, as a recommended base
> >>>>> implementation for all StateStores, including those implemented by
> >>>> users. I
> >>>>> had assumed that anything in "public" packages, such as
> >>>>> org.apache.kafka.streams.state, should be included in a KIP. Is that
> >>>> wrong?
> >>>>>
> >>>>> 5.
> >>>>> RocksDB provides no way to measure the actual size of a
> >>>>> WriteBatch(WithIndex), so we're limited to tracking the sum total of
> >> the
> >>>>> size of keys + values that are written to the transaction. This
> >> obviously
> >>>>> under-estimates the actual memory usage, because WriteBatch no-doubt
> >>>>> includes some record overheads, and WriteBatchWithIndex has to
> maintain
> >>>> an
> >>>>> index.
> >>>>>
> >>>>> Ideally, we could trivially add a method upstream to
> >> WriteBatchInterface
> >>>>> that provides the exact size of the batch, but that would require an
> >>>>> upgrade of RocksDB, which won't happen soon. So for the time being,
> >> we're
> >>>>> stuck with an approximation, so I felt that the new method should
> >> reflect
> >>>>> that.
> >>>>>
> >>>>> Would you prefer the new method name ignores this constraint and that
> >> we
> >>>>> simply make the rocks measurement more accurate in the future?
> >>>>>
> >>>>> 6.
> >>>>> Done
> >>>>>
> >>>>> 7.
> >>>>> Very good point. The KIP already specifically calls out memory in the
> >>>>> documentation of the config: "Maximum number of memory bytes to be
> used
> >>>> to
> >>>>> buffer uncommitted state-store records." - did you have something
> else
> >> in
> >>>>> mind?
> >>>>>
> >>>>> Should we also make this clearer by renaming the config property
> >> itself?
> >>>>> Perhaps to something like statestore.transaction.buffer.max.bytes?
> >>>>>
> >>>>> 8.
> >>>>> OK, I can remove this. The intent here was to describe how Streams
> >> itself
> >>>>> will manage transaction roll-over etc. Presumably that means we also
> >>>> don't
> >>>>> need a description of how Streams will manage the commit of changelog
> >>>>> transactions, state store transactions and checkpointing?
> >>>>>
> >>>>> 9.
> >>>>> What do you mean by fail-over? Do you mean failing over an Active
> Task
> >> to
> >>>>> an instance already hosting a Standby Task?
> >>>>>
> >>>>> Thanks again and sorry for the essay of a response!
> >>>>>
> >>>>> Regards,
> >>>>> Nick
> >>>>>
> >>>>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Nick,
> >>>>>>
> >>>>>> Thanks for the updates!
> >>>>>>
> >>>>>> I really appreciate that you simplified the KIP by removing some
> >>>>>> aspects. As I have already told you, I think the removed aspects are
> >>>>>> also good ideas and we can discuss them on follow-up KIPs.
> >>>>>>
> >>>>>> Regarding the current KIP, I have the following feedback.
> >>>>>>
> >>>>>> 1.
> >>>>>> Is there a good reason to add method newTransaction() to the
> >> StateStore
> >>>>>> interface? As far as I understand, the idea is that users of a state
> >>>>>> store (transactional or not) call this method at start-up and after
> >> each
> >>>>>> commit. Since the call to newTransaction() is done in any case and I
> >>>>>> think it would simplify the caller code if we just start a new
> >>>>>> transaction after a commit in the implementation?
> >>>>>> As far as I understand, you plan to commit the transaction in the
> >>>>>> flush() method. I find the idea to replace flush() with commit()
> >>>>>> presented in KIP-844 an elegant solution.
> >>>>>>
> >>>>>> 2.
> >>>>>> Why is the method to query the isolation level added to the state
> >> store
> >>>>>> context?
> >>>>>>
> >>>>>> 3.
> >>>>>> Do we need all the isolation level definitions? I think it is good
> to
> >>>>>> know the guarantees of the transactionality of the state store.
> >>>>>> However, currently, Streams guarantees that there will only be one
> >>>>>> transaction that writes to the state store. Only the stream thread
> >> that
> >>>>>> executes the active task that owns the state store will write to the
> >>>>>> state store. I think it should be enough to know if the state store
> is
> >>>>>> transactional or not. So my proposal would be to just add a method
> on
> >>>>>> the state store interface the returns if a state store is
> >> transactional
> >>>>>> or not by returning a boolean or an enum.
> >>>>>>
> >>>>>> 4.
> >>>>>> I am wondering why AbstractTransaction and
> AbstractTransactionalStore
> >>>>>> are part of the KIP. They look like implementation details that
> should
> >>>>>> not be exposed in the public API.
> >>>>>>
> >>>>>> 5.
> >>>>>> Why does StateStore#approximateNumUncommittedBytes() return an
> >>>>>> approximate number of bytes?
> >>>>>>
> >>>>>> 6.
> >>>>>> RocksDB is just one implementation of the state stores in Streams.
> >>>>>> However, the issues regarding OOM errors might also apply to other
> >>>>>> custom implementations. So in the KIP I would extract that part from
> >>>>>> section "RocksDB Transaction". I would also move section "RocksDB
> >>>>>> Transaction" to the end of section "Proposed Changes" and handle it
> as
> >>>>>> an example implementation for a state store.
> >>>>>>
> >>>>>> 7.
> >>>>>> Should statestore.uncommitted.max.bytes only limit the uncommitted
> >> bytes
> >>>>>> or the uncommitted bytes that reside in memory? In future, other
> >>>>>> transactional state store implementations might implement a buffer
> for
> >>>>>> uncommitted records that are able to spill records on disk. I think
> >>>>>> statestore.uncommitted.max.bytes needs to limit the uncommitted
> bytes
> >>>>>> irrespective if they reside in memory or disk. Since Streams will
> use
> >>>>>> this config to decide if it needs to trigger a commit, state store
> >>>>>> implementations that can spill to disk will never be able to spill
> to
> >>>>>> disk. You would only need to change the doc of the config, if you
> >> agree
> >>>>>> with me.
> >>>>>>
> >>>>>> 8.
> >>>>>> Section "Transaction Management" about the wrappers is rather a
> >>>>>> implementation detail that should not be in the KIP.
> >>>>>>
> >>>>>> 9.
> >>>>>> Could you add a section that describes how failover will work with
> the
> >>>>>> transactional state stores? I think section "Error handling" is
> >> already
> >>>>>> a good start.
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 15.05.23 11:04, Nick Telford wrote:
> >>>>>>> Hi everyone,
> >>>>>>>
> >>>>>>> Quick update: I've added a new section to the KIP: "Offsets for
> >>>> Consumer
> >>>>>>> Rebalances", that outlines my solution to the problem that
> >>>>>>> StreamsPartitionAssignor needs to read StateStore offsets even if
> >>>> they're
> >>>>>>> not currently open.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Bruno,
> >>>>>>>>
> >>>>>>>> Thanks for reviewing my proposal.
> >>>>>>>>
> >>>>>>>> 1.
> >>>>>>>> The main reason I added it was because it was easy to do. If we
> see
> >> no
> >>>>>>>> value in it, I can remove it.
> >>>>>>>>
> >>>>>>>> 2.
> >>>>>>>> Global StateStores can have multiple partitions in their input
> >> topics
> >>>>>>>> (which function as their changelogs), so they would have more than
> >> one
> >>>>>>>> partition.
> >>>>>>>>
> >>>>>>>> 3.
> >>>>>>>> That's a good point. At present, the only method it adds is
> >>>>>>>> isolationLevel(), which is likely not necessary outside of
> >>>> StateStores.
> >>>>>>>> It *does* provide slightly different guarantees in the
> documentation
> >>>> to
> >>>>>>>> several of the methods (hence the overrides). I'm not sure if this
> >> is
> >>>>>>>> enough to warrant a new interface though.
> >>>>>>>> I think the question that remains is whether this interface makes
> it
> >>>>>>>> easier to implement custom transactional StateStores than if we
> were
> >>>> to
> >>>>>>>> remove it? Probably not.
> >>>>>>>>
> >>>>>>>> 4.
> >>>>>>>> The main motivation for the Atomic Checkpointing is actually
> >>>>>> performance.
> >>>>>>>> My team has been testing out an implementation of this KIP without
> >> it,
> >>>>>> and
> >>>>>>>> we had problems with RocksDB doing *much* more compaction, due to
> >> the
> >>>>>>>> significantly increased flush rate. It was enough of a problem
> that
> >>>> (for
> >>>>>>>> the time being), we had to revert back to Kafka Streams proper.
> >>>>>>>> I think the best way to solve this, as you say, is to keep the
> >>>>>> .checkpoint
> >>>>>>>> files *in addition* to the offsets being stored within the store
> >>>> itself.
> >>>>>>>> Essentially, when closing StateStores, we force a memtable flush,
> >> then
> >>>>>>>> call getCommittedOffsets and write those out to the .checkpoint
> >> file.
> >>>>>>>> That would ensure the metadata is available to the
> >>>>>>>> StreamsPartitionAssignor for all closed stores.
> >>>>>>>> If there's a crash (no clean close), then we won't be able to
> >>>> guarantee
> >>>>>>>> which offsets were flushed to disk by RocksDB, so we'd need to
> open
> >> (
> >>>>>>>> init()), read offsets, and then close() those stores. But since
> this
> >>>> is
> >>>>>>>> the exception, and will only occur once (provided it doesn't crash
> >>>> every
> >>>>>>>> time!), I think the performance impact here would be acceptable.
> >>>>>>>>
> >>>>>>>> Thanks for the feedback, please let me know if you have any more
> >>>>>> comments
> >>>>>>>> or questions!
> >>>>>>>>
> >>>>>>>> I'm currently working on rebasing against trunk. This involves
> >> adding
> >>>>>>>> support for transactionality to VersionedStateStores. I will
> >> probably
> >>>>>> need
> >>>>>>>> to revise my implementation for transactional "segmented" stores,
> >> both
> >>>>>> to
> >>>>>>>> accommodate VersionedStateStore, and to clean up some other stuff.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Nick
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Nick,
> >>>>>>>>>
> >>>>>>>>> Thanks for the updates!
> >>>>>>>>>
> >>>>>>>>> I have a couple of questions/comments.
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>> Why do you propose a configuration that involves max. bytes and
> >> max.
> >>>>>>>>> reords? I think we are mainly concerned about memory consumption
> >>>>>> because
> >>>>>>>>> we want to limit the off-heap memory used. I cannot think of a
> case
> >>>>>>>>> where one would want to set the max. number of records.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2.
> >>>>>>>>> Why does
> >>>>>>>>>
> >>>>>>>>>       default void commit(final Map<TopicPartition, Long>
> >>>>>> changelogOffsets) {
> >>>>>>>>>           flush();
> >>>>>>>>>       }
> >>>>>>>>>
> >>>>>>>>> take a map of partitions to changelog offsets?
> >>>>>>>>> The mapping between state stores to partitions is a 1:1
> >> relationship.
> >>>>>>>>> Passing in a single changelog offset should suffice.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 3.
> >>>>>>>>> Why do we need the Transaction interface? It should be possible
> to
> >>>> hide
> >>>>>>>>> beginning and committing a transactions withing the state store
> >>>>>>>>> implementation, so that from outside the state store, it does not
> >>>>>> matter
> >>>>>>>>> whether the state store is transactional or not. What would be
> the
> >>>>>>>>> advantage of using the Transaction interface?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 4.
> >>>>>>>>> Regarding checkpointing offsets, I think we should keep the
> >>>> checkpoint
> >>>>>>>>> file in any case for the reason you mentioned about rebalancing.
> >> Even
> >>>>>> if
> >>>>>>>>> that would not be an issue, I would propose to move the change to
> >>>>>> offset
> >>>>>>>>> management to a new KIP and to not add more complexity than
> needed
> >> to
> >>>>>>>>> this one. I would not be too concerned about the consistency
> >>>> violation
> >>>>>>>>> you mention. As far as I understand, with transactional state
> >> stores
> >>>>>>>>> Streams would write the checkpoint file during every commit even
> >>>> under
> >>>>>>>>> EOS. In the failure case you describe, Streams would restore the
> >>>> state
> >>>>>>>>> stores from the offsets found in the checkpoint file written
> during
> >>>> the
> >>>>>>>>> penultimate commit instead of during the last commit. Basically,
> >>>>>> Streams
> >>>>>>>>> would overwrite the records written to the state store between
> the
> >>>> last
> >>>>>>>>> two commits with the same records read from the changelogs.
> While I
> >>>>>>>>> understand that this is wasteful, it is -- at the same time --
> >>>>>>>>> acceptable and most importantly it does not break EOS.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 27.04.23 12:34, Nick Telford wrote:
> >>>>>>>>>> Hi everyone,
> >>>>>>>>>>
> >>>>>>>>>> I find myself (again) considering removing the offset management
> >>>> from
> >>>>>>>>>> StateStores, and keeping the old checkpoint file system. The
> >> reason
> >>>> is
> >>>>>>>>> that
> >>>>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in
> >> order
> >>>>>> to
> >>>>>>>>>> determine which instance has the most up-to-date copy of the
> local
> >>>>>>>>> state.
> >>>>>>>>>> If we move offsets into the StateStore itself, then we will need
> >> to
> >>>>>>>>> open,
> >>>>>>>>>> initialize, read offsets and then close each StateStore (that is
> >> not
> >>>>>>>>>> already assigned and open) for which we have *any* local state,
> on
> >>>>>> every
> >>>>>>>>>> rebalance.
> >>>>>>>>>>
> >>>>>>>>>> Generally, I don't think there are many "orphan" stores like
> this
> >>>>>>>>> sitting
> >>>>>>>>>> around on most instances, but even a few would introduce
> >> additional
> >>>>>>>>> latency
> >>>>>>>>>> to an already somewhat lengthy rebalance procedure.
> >>>>>>>>>>
> >>>>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping
> >> things
> >>>>>> in
> >>>>>>>>> the
> >>>>>>>>>> checkpoint file(s) for now, and not worrying about the race. The
> >>>>>>>>> downside
> >>>>>>>>>> is that we wouldn't be able to remove the explicit RocksDB flush
> >>>>>>>>> on-commit,
> >>>>>>>>>> which likely hurts performance.
> >>>>>>>>>>
> >>>>>>>>>> If anyone has any thoughts or ideas on this subject, I would
> >>>>>> appreciate
> >>>>>>>>> it!
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Nick
> >>>>>>>>>>
> >>>>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <
> >> nick.telf...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>
> >>>>>>>>>>> The issue is that if there's a crash between 2 and 3, then you
> >>>> still
> >>>>>>>>> end
> >>>>>>>>>>> up with inconsistent data in RocksDB. The only way to guarantee
> >>>> that
> >>>>>>>>> your
> >>>>>>>>>>> checkpoint offsets and locally stored data are consistent with
> >> each
> >>>>>>>>> other
> >>>>>>>>>>> are to atomically commit them, which can be achieved by having
> >> the
> >>>>>>>>> offsets
> >>>>>>>>>>> stored in RocksDB.
> >>>>>>>>>>>
> >>>>>>>>>>> The offsets column family is likely to be extremely small (one
> >>>>>>>>>>> per-changelog partition + one per Topology input partition for
> >>>>>> regular
> >>>>>>>>>>> stores, one per input partition for global stores). So the
> >> overhead
> >>>>>>>>> will be
> >>>>>>>>>>> minimal.
> >>>>>>>>>>>
> >>>>>>>>>>> A major benefit of doing this is that we can remove the
> explicit
> >>>>>> calls
> >>>>>>>>> to
> >>>>>>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit.
> >> It
> >>>>>>>>> turns
> >>>>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka
> >> Streams
> >>>>>>>>>>> commits, *not* RocksDB configuration, which could be a major
> >> source
> >>>>>> of
> >>>>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these
> >>>>>> explicit
> >>>>>>>>>>> flushes, because it no longer matters exactly when RocksDB
> >> flushes
> >>>>>>>>> data to
> >>>>>>>>>>> disk; since the data and corresponding checkpoint offsets will
> >>>> always
> >>>>>>>>> be
> >>>>>>>>>>> flushed together, the local store is always in a consistent
> >> state,
> >>>>>> and
> >>>>>>>>>>> on-restart, it can always safely resume restoration from the
> >>>> on-disk
> >>>>>>>>>>> offsets, restoring the small amount of data that hadn't been
> >>>> flushed
> >>>>>>>>> when
> >>>>>>>>>>> the app exited/crashed.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Nick
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <
> c...@littlehorse.io>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Nick,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for your reply. Ack to A) and B).
> >>>>>>>>>>>>
> >>>>>>>>>>>> For item C), I see what you're referring to. Your proposed
> >>>> solution
> >>>>>>>>> will
> >>>>>>>>>>>> work, so no need to change it. What I was suggesting was that
> it
> >>>>>>>>> might be
> >>>>>>>>>>>> possible to achieve this with only one column family. So long
> >> as:
> >>>>>>>>>>>>
> >>>>>>>>>>>>         - No uncommitted records (i.e. not committed to the
> >>>> changelog)
> >>>>>> are
> >>>>>>>>>>>>         *committed* to the state store, AND
> >>>>>>>>>>>>         - The Checkpoint offset (which refers to the changelog
> >>>> topic)
> >>>>>> is
> >>>>>>>>> less
> >>>>>>>>>>>>         than or equal to the last written changelog offset in
> >>>> rocksdb
> >>>>>>>>>>>>
> >>>>>>>>>>>> I don't see the need to do the full restoration from scratch.
> My
> >>>>>>>>>>>> understanding was that prior to 844/892, full restorations
> were
> >>>>>>>>> required
> >>>>>>>>>>>> because there could be uncommitted records written to RocksDB;
> >>>>>>>>> however,
> >>>>>>>>>>>> given your use of RocksDB transactions, that can be avoided
> with
> >>>> the
> >>>>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB
> >>>>>>>>> transaction, 3)
> >>>>>>>>>>>> update offset in checkpoint file.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Anyways, your proposed solution works equivalently and I don't
> >>>>>> believe
> >>>>>>>>>>>> there is much overhead to an additional column family in
> >> RocksDB.
> >>>>>>>>> Perhaps
> >>>>>>>>>>>> it may even perform better than making separate writes to the
> >>>>>>>>> checkpoint
> >>>>>>>>>>>> file.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <
> >>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from
> the
> >>>>>> rest
> >>>>>>>>> of
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> Streams engine. The fact that there will be only one ongoing
> >>>>>> (write)
> >>>>>>>>>>>>> transaction at a time is not guaranteed by any API, and is
> >> just a
> >>>>>>>>>>>>> consequence of the way Streams operates. To that end, I tried
> >> to
> >>>>>>>>> ensure
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> documentation and guarantees provided by the new APIs are
> >>>>>>>>> independent of
> >>>>>>>>>>>>> this incidental behaviour. In practice, you're right, this
> >>>>>>>>> essentially
> >>>>>>>>>>>>> refers to "interactive queries", which are technically "read
> >>>>>>>>>>>> transactions",
> >>>>>>>>>>>>> even if they don't actually use the transaction API to
> isolate
> >>>>>>>>>>>> themselves.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> B. Yes, although not ideal. This is for backwards
> >> compatibility,
> >>>>>>>>>>>> because:
> >>>>>>>>>>>>>          1) Existing custom StateStore implementations will
> >>>> implement
> >>>>>>>>>>>> flush(),
> >>>>>>>>>>>>> and not commit(), but the Streams engine now calls commit(),
> so
> >>>>>> those
> >>>>>>>>>>>> calls
> >>>>>>>>>>>>> need to be forwarded to flush() for these legacy stores.
> >>>>>>>>>>>>>          2) Existing StateStore *users*, i.e. outside of the
> >>>> Streams
> >>>>>>>>> engine
> >>>>>>>>>>>>> itself, may depend on explicitly calling flush(), so for
> these
> >>>>>> cases,
> >>>>>>>>>>>>> flush() needs to be redirected to call commit().
> >>>>>>>>>>>>> If anyone has a better way to guarantee compatibility without
> >>>>>>>>>>>> introducing
> >>>>>>>>>>>>> this potential recursion loop, I'm open to changes!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section.
> >>>> Offsets
> >>>>>>>>> are
> >>>>>>>>>>>>> stored in a separate RocksDB column family, which is
> guaranteed
> >>>> to
> >>>>>> be
> >>>>>>>>>>>>> atomically flushed to disk with all other column families.
> The
> >>>>>> issue
> >>>>>>>>> of
> >>>>>>>>>>>>> checkpoints being written to disk after commit causing
> >>>>>> inconsistency
> >>>>>>>>> if
> >>>>>>>>>>>> it
> >>>>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint
> >> files
> >>>>>> are
> >>>>>>>>>>>> only
> >>>>>>>>>>>>> written on clean shutdown. This is one of the major causes of
> >>>> "full
> >>>>>>>>>>>>> restorations", so moving the offsets into a place where they
> >> can
> >>>> be
> >>>>>>>>>>>>> guaranteed to be atomically written with the data they
> >> checkpoint
> >>>>>>>>>>>> allows us
> >>>>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just
> on
> >>>>>> clean
> >>>>>>>>>>>>> shutdown.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <
> >> c...@littlehorse.io>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you for continuing this work. I have a few minor
> >>>> clarifying
> >>>>>>>>>>>>>> questions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> A) "Records written to any transaction are visible to all
> >> other
> >>>>>>>>>>>>>> transactions immediately." I am confused here—I thought
> there
> >>>>>> could
> >>>>>>>>>>>> only
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>> one transaction going on at a time for a given state store
> >> given
> >>>>>> the
> >>>>>>>>>>>>>> threading model for processing records on a Task. Do you
> mean
> >>>>>>>>>>>> Interactive
> >>>>>>>>>>>>>> Queries by "other transactions"? (If so, then everything
> makes
> >>>>>>>>> sense—I
> >>>>>>>>>>>>>> thought that since IQ were read-only then they didn't count
> as
> >>>>>>>>>>>>>> transactions).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> B) Is it intentional that the default implementations of the
> >>>>>> flush()
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>> commit() methods in the StateStore class refer to each other
> >> in
> >>>>>> some
> >>>>>>>>>>>> sort
> >>>>>>>>>>>>>> of unbounded recursion?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> C) How will the getCommittedOffset() method work? At first I
> >>>>>> thought
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> way to do it would be using a special key in the RocksDB
> store
> >>>> to
> >>>>>>>>>>>> store
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> offset, and committing that with the transaction. But upon
> >>>> second
> >>>>>>>>>>>>> thought,
> >>>>>>>>>>>>>> since restoration from the changelog is an idempotent
> >>>> procedure, I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then
> 2)
> >>>>>> write
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash
> >> between
> >>>>>> 1)
> >>>>>>>>>>>> and
> >>>>>>>>>>>>> 2),
> >>>>>>>>>>>>>> I think the only downside is now we replay a few more
> records
> >>>> (at
> >>>>>> a
> >>>>>>>>>>>> cost
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>> <100ms). Am I missing something there?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Other than that, everything makes sense to me.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
> >>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the
> >>>> design:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> There are several changes in there that reflect feedback
> from
> >>>>>> this
> >>>>>>>>>>>>>> thread,
> >>>>>>>>>>>>>>> and there's a new section and a bunch of interface changes
> >>>>>> relating
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the
> puzzle
> >> to
> >>>>>>>>>>>> making
> >>>>>>>>>>>>>>> everything robust.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know what you think!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <
> >>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Lucas,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thanks for looking over my KIP.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a
> typo
> >> in
> >>>>>> the
> >>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I
> >>>>>>>>>>>> changed it
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> per-instance for exactly the reason you highlighted.
> >>>>>>>>>>>>>>>> B) It's worth noting that transactionality is only enabled
> >>>> under
> >>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be
> no
> >>>>>>>>>>>> change in
> >>>>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the
> >>>> impact
> >>>>>> on
> >>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>> sufficiently low default values for the memory bound
> >>>>>>>>>>>> configuration. I
> >>>>>>>>>>>>>>>> understand your hesitation to include a significant change
> >> of
> >>>>>>>>>>>>>> behaviour,
> >>>>>>>>>>>>>>>> especially in a minor release, but I suspect that most
> users
> >>>>>> will
> >>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of
> >>>>>>>>>>>> frequent
> >>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait
> >> until
> >>>>>> the
> >>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>> major release. I'll be running a patched version of
> streams
> >> in
> >>>>>>>>>>>>>> production
> >>>>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't
> >>>> disrupt
> >>>>>>>>>>>> me
> >>>>>>>>>>>>> :-D
> >>>>>>>>>>>>>>>> C) The main purpose of this sentence was just to note that
> >>>> some
> >>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>> will need to be made to the way Segments are handled in
> >> order
> >>>> to
> >>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote
> >> it, I
> >>>>>>>>>>>> hadn't
> >>>>>>>>>>>>>>>> figured out the specific changes necessary, so it was
> >>>>>> deliberately
> >>>>>>>>>>>>>> vague.
> >>>>>>>>>>>>>>>> This is the one outstanding problem I'm currently working
> >> on,
> >>>>>> and
> >>>>>>>>>>>>> I'll
> >>>>>>>>>>>>>>>> update this section with more detail once I have figured
> out
> >>>> the
> >>>>>>>>>>>>> exact
> >>>>>>>>>>>>>>>> changes required.
> >>>>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation
> >>>> guarantees.
> >>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't
> >> technically
> >>>>>>>>>>>> *need*
> >>>>>>>>>>>>>>>> read-only users to call newTransaction(), other
> >>>> implementations
> >>>>>>>>>>>>> (e.g. a
> >>>>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling
> >>>>>>>>>>>> newTransaction()
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> no transaction is necessary is essentially free, as it
> will
> >>>> just
> >>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think
> it
> >>>>>>>>>>>> should
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> fairly obvious where the performance problems stem from:
> >>>> writes
> >>>>>>>>>>>> under
> >>>>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with
> >> the
> >>>>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the
> >>>> tombstone/record
> >>>>>>>>>>>> flag,
> >>>>>>>>>>>>>>> and 1
> >>>>>>>>>>>>>>>> to copy the record from the "temporary" store to the
> "main"
> >>>>>> store,
> >>>>>>>>>>>>> when
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> transaction commits. The different approach taken by
> KIP-869
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>>> perform
> >>>>>>>>>>>>>>>> much better, as it avoids all these copies, and may
> actually
> >>>>>>>>>>>> perform
> >>>>>>>>>>>>>>>> slightly better than trunk, due to batched writes in
> RocksDB
> >>>>>>>>>>>>> performing
> >>>>>>>>>>>>>>>> better than non-batched writes.[1]
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
> >>>>>>>>>>>> lbruts...@confluent.io
> >>>>>>>>>>>>>>> .invalid>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion
> about
> >>>>>>>>>>>> KIP-892
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think
> >>>>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do
> have
> >>>> some
> >>>>>>>>>>>>>>>>> questions about the latest draft.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       A) If I understand correctly, you propose to put a
> >> bound
> >>>> on
> >>>>>> the
> >>>>>>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder
> if
> >>>>>> this
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the
> cluster.
> >>>> For
> >>>>>>>>>>>>>>>>> example, depending on the timing of rebalances during a
> >>>> cluster
> >>>>>>>>>>>>>>>>> restart, it could happen that a single streams node is
> >>>>>> assigned a
> >>>>>>>>>>>>> lot
> >>>>>>>>>>>>>>>>> more tasks than expected. With your proposed change, this
> >>>> would
> >>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>> that the memory required by this one node could be a
> >> multiple
> >>>>>> of
> >>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>> is required during normal operation. I wonder if it
> >> wouldn't
> >>>> be
> >>>>>>>>>>>>> safer
> >>>>>>>>>>>>>>>>> to put a global bound on the memory use, across all
> tasks.
> >>>>>>>>>>>>>>>>>       B) Generally, the memory concerns still give me the
> >>>> feeling
> >>>>>>>>> that
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> should not be enabled by default for all users in a minor
> >>>>>>>>>>>> release.
> >>>>>>>>>>>>>>>>>       C) In section "Transaction Management": the
> sentence
> >> "A
> >>>>>> similar
> >>>>>>>>>>>>>>>>> analogue will be created to automatically manage
> `Segment`
> >>>>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some
> >>>> background,
> >>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>> do not understand this, it would be great if you could
> >>>> clarify
> >>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>> you mean here.
> >>>>>>>>>>>>>>>>>       D) Could you please clarify why IQ has to call
> >>>>>>>>> newTransaction(),
> >>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> it's read-only.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if
> >> there
> >>>>>> is
> >>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x
> >>>> slower
> >>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>> by providing a flame graph), that would be quite
> >> interesting.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
> >>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which
> >>>>>>>>>>>> reflects
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> implementation I've been working on:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This new design should address the outstanding points
> >>>> already
> >>>>>>>>>>>> made
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear
> or
> >>>> need
> >>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>> clarification.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident
> >> that
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> remaining
> >>>>>>>>>>>>>>>>>> work (making Segments behave) will not impact the
> >> documented
> >>>>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <
> >>>>>> c...@littlehorse.io
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping
> >>>> that,
> >>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the
> >>>>>>>>>>>> documented
> >>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release
> to
> >>>>>>>>>>>> make
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> change;
> >>>>>>>>>>>>>>>>>>> but given that it would be considered a major change, I
> >>>> like
> >>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding!
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
> >>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended
> >> to
> >>>>>>>>>>>> try
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> keep the
> >>>>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise
> we'd
> >>>>>>>>>>>> have
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> wait for
> >>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> major version release to land these changes.
> >>>>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of
> >>>>>>>>>>>> guarantee,
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> typically longer commit intervals would be problematic
> >>>> when
> >>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>> "committed" records.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor
> surgery,
> >>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>> spent a
> >>>>>>>>>>>>>>>>>>>> considerable amount of that time working through ideas
> >> for
> >>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of
> keeping
> >>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>> as-is, but
> >>>>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards,
> >>>>>>>>>>>> although
> >>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for
> >> some
> >>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> formal
> >>>>>>>>>>>>>>>>>>>> guarantees.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for
> >>>>>>>>>>>>>> "Transactions",
> >>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read
> Committed"
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>> under
> >>>>>>>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us
> >>>> much
> >>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> clearly
> >>>>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly
> >>>>>>>>>>>> documented to
> >>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>> know what to expect.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and
> will
> >>>>>>>>>>>> update
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to
> >> implement
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces or
> >>>>>>>>>>>>> breaking
> >>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>> implementations, because currently everything expects
> to
> >>>>>>>>>>>>> operate
> >>>>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store.
> I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>> getting
> >>>>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much
> >>>>>>>>>>>> until
> >>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>> week
> >>>>>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>>>> to some work commitments.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
> >>>>>>>>>>>>> c...@littlehorse.io>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the
> updated
> >>>>>>>>>>>>> KIP. I
> >>>>>>>>>>>>>>> am
> >>>>>>>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>>>>> eager for this improvement to be released as it would
> >>>>>>>>>>>>> greatly
> >>>>>>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Two questions:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 10)
> >>>>>>>>>>>>>>>>>>>>>> When reading records, we will use the
> >>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB
> >>>>>>>>>>>>>>>>>>>>>       and WriteBatchWithIndex#newIteratorWithBase
> >>>> utilities in
> >>>>>>>>>>>>>> order
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>>>> that uncommitted writes are available to query.
> >>>>>>>>>>>>>>>>>>>>> Why do extra work to enable the reading of
> uncommitted
> >>>>>>>>>>>>> writes
> >>>>>>>>>>>>>>>>> during
> >>>>>>>>>>>>>>>>>>> IQ?
> >>>>>>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is,
> >> in
> >>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>> opinion, a
> >>>>>>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have
> the
> >>>>>>>>>>>>>>> guarantee
> >>>>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order
> to
> >>>>>>>>>>>> avoid
> >>>>>>>>>>>>>>> dirty
> >>>>>>>>>>>>>>>>>>> reads,
> >>>>>>>>>>>>>>>>>>>>> one currently must query a standby replica (but this
> >>>>>>>>>>>> still
> >>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>>>>>>>> guarantee monotonic reads).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization
> on
> >>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>> stores?
> >>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need
> to
> >>>>>>>>>>>>> restore
> >>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was
> >>>>>>>>>>>>> acceptable
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk.
> >> On a
> >>>>>>>>>>>>> side
> >>>>>>>>>>>>>>>>> note, if
> >>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the
> >>>>>>>>>>>> community
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads;
> >>>>>>>>>>>> otherwise
> >>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thank you for doing this fantastic work!
> >>>>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
> >>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no
> >>>>>>>>>>>>> longer
> >>>>>>>>>>>>>>>>> include
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be
> >>>>>>>>>>>> added
> >>>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>> to further optimize the consistency and performance
> of
> >>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>> stores.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I've also added a section discussing some of the
> >>>>>>>>>>>> concerns
> >>>>>>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>>>>>> concurrency, especially in the presence of
> Iterators.
> >>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>> thinking of
> >>>>>>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a
> reference-counting
> >>>>>>>>>>>>>>>>> copy-on-write
> >>>>>>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an
> >>>>>>>>>>>>> active
> >>>>>>>>>>>>>>>>>>> iterator),
> >>>>>>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>> I'm open to suggestions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford <
> >>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844
> >>>>>>>>>>>> implementation:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>         - Writes uncommitted records to a temporary
> >>>>>>>>>>>> RocksDB
> >>>>>>>>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>>>>>            - Since tombstones need to be flagged,
> all
> >>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>> values are
> >>>>>>>>>>>>>>>>>>>>>>>            prefixed with a value/tombstone marker.
> >> This
> >>>>>>>>>>>>>>>>> necessitates a
> >>>>>>>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>>>>>> copy.
> >>>>>>>>>>>>>>>>>>>>>>>         - On-commit, iterates all records in this
> >>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>> instance and
> >>>>>>>>>>>>>>>>>>>>>>>         writes them to the main RocksDB store.
> >>>>>>>>>>>>>>>>>>>>>>>         - While iterating, the value/tombston

Reply via email to