Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to
try to simplify my original design down such that it needs no/minimal
changes to the public interface.

If that succeeds, then it should also be possible to transparently
implement the "no memtables" solution as a performance optimization when
the record cache is enabled. I consider this approach only an optimisation,
because of the need to still support stores with the cache disabled.

For that reason, I think the "no memtables" approach would probably best be
suited as a follow-up KIP, but that we keep it in mind during the design of
this one.

What do you think?

Regards,
Nick


On Tue, 20 Jun 2023, 22:26 John Roesler, <vvcep...@apache.org> wrote:

> Oh, that's a good point.
>
> On the topic of a behavioral switch for disabled caches, the typical use
> case for disabling the cache is to cause each individual update to
> propagate down the topology, so another thought might be to just go
> ahead and add the memory we would have used for the memtables to the
> cache size, but if people did disable the cache entirely, then we could
> still go ahead and forward the records on each write?
>
> I know that Guozhang was also proposing for a while to actually decouple
> caching and forwarding, which might provide a way to side-step this
> dilemma (i.e., we just always forward and only apply the cache to state
> and changelog writes).
>
> By the way, I'm basing my statement on why you'd disable caches on
> memory, but also on the guidance here:
>
> https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html
> . That doc also contains a section on how to bound the total memory
> usage across RocksDB memtables, which points to another benefit of
> disabling memtables and managing the write buffer ourselves (simplified
> memory configuration).
>
> Thanks,
> -John
>
> On 6/20/23 16:05, Nick Telford wrote:
> > Potentially we could just go the memorable with Rocks WriteBatches route
> if
> > the cache is disabled?
> >
> > On Tue, 20 Jun 2023, 22:00 John Roesler, <j...@vvcephei.org> wrote:
> >
> >> Touché!
> >>
> >> Ok, I agree that figuring out the case of a disabled cache would be
> >> non-trivial. Ingesting single-record SST files will probably not be
> >> performant, but benchmarking may prove different. Or maybe we can have
> >> some reserved cache space on top of the user-configured cache, which we
> >> would have reclaimed from the memtable space. Or some other, more
> >> creative solution.
> >>
> >> Thanks,
> >> -John
> >>
> >> On 6/20/23 15:30, Nick Telford wrote:
> >>>> Note that users can disable the cache, which would still be
> >>> ok, I think. We wouldn't ingest the SST files on every record, but just
> >>> append to them and only ingest them on commit, when we're already
> >>> waiting for acks and a RocksDB commit.
> >>>
> >>> In this case, how would uncommitted records be read by joins?
> >>>
> >>> On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote:
> >>>
> >>>> Ah, sorry Nick,
> >>>>
> >>>> I just meant the regular heap based cache that we maintain in
> Streams. I
> >>>> see that it's not called "RecordCache" (my mistake).
> >>>>
> >>>> The actual cache is ThreadCache:
> >>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
> >>>>
> >>>> Here's the example of how we use the cache in KeyValueStore:
> >>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
> >>>>
> >>>> It's basically just an on-heap Map of records that have not yet been
> >>>> written to the changelog or flushed into the underlying store. It gets
> >>>> flushed when the total cache size exceeds `cache.max.bytes.buffering`
> or
> >>>> the `commit.interval.ms` elapses.
> >>>>
> >>>> Speaking of those configs, another benefit to this idea is that we
> would
> >>>> no longer need to trigger extra commits based on the size of the
> ongoing
> >>>> transaction. Instead, we'd just preserve the existing cache-flush
> >>>> behavior. Note that users can disable the cache, which would still be
> >>>> ok, I think. We wouldn't ingest the SST files on every record, but
> just
> >>>> append to them and only ingest them on commit, when we're already
> >>>> waiting for acks and a RocksDB commit.
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On 6/20/23 14:09, Nick Telford wrote:
> >>>>> Hi John,
> >>>>>
> >>>>> By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find
> >> any
> >>>>> class called "RecordCache"...
> >>>>>
> >>>>> Cheers,
> >>>>>
> >>>>> Nick
> >>>>>
> >>>>> On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Nick,
> >>>>>>
> >>>>>> Thanks for picking this up again!
> >>>>>>
> >>>>>> I did have one new thought over the intervening months, which I'd
> like
> >>>>>> your take on.
> >>>>>>
> >>>>>> What if, instead of using the RocksDB atomic write primitive at all,
> >> we
> >>>>>> instead just:
> >>>>>> 1. disable memtables entirely
> >>>>>> 2. directly write the RecordCache into SST files when we flush
> >>>>>> 3. atomically ingest the SST file(s) into RocksDB when we get the
> ACK
> >>>>>> from the changelog (see
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> >>>>>> and
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> >>>>>> and
> >>>>>>
> >>>>>>
> >>>>
> >>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> >>>>>> )
> >>>>>> 4. track the changelog offsets either in another CF or the same CF
> >> with
> >>>>>> a reserved key, either of which will make the changelog offset
> update
> >>>>>> atomic with the file ingestions
> >>>>>>
> >>>>>> I suspect this'll have a number of benefits:
> >>>>>> * writes to RocksDB will always be atomic
> >>>>>> * we don't fragment memory between the RecordCache and the memtables
> >>>>>> * RecordCache gives far higher performance than memtable for reads
> and
> >>>>>> writes
> >>>>>> * we don't need any new "transaction" concepts or memory bound
> configs
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> -John
> >>>>>>
> >>>>>> On 6/20/23 10:51, Nick Telford wrote:
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> Thanks for reviewing the KIP. It's been a long road, I started
> >> working
> >>>> on
> >>>>>>> this more than a year ago, and most of the time in the last 6
> months
> >>>> has
> >>>>>>> been spent on the "Atomic Checkpointing" stuff that's been benched,
> >> so
> >>>>>> some
> >>>>>>> of the reasoning behind some of my decisions have been lost, but
> I'll
> >>>> do
> >>>>>> my
> >>>>>>> best to reconstruct them.
> >>>>>>>
> >>>>>>> 1.
> >>>>>>> IIRC, this was the initial approach I tried. I don't remember the
> >> exact
> >>>>>>> reasons I changed it to use a separate "view" of the StateStore
> that
> >>>>>>> encapsulates the transaction, but I believe it had something to do
> >> with
> >>>>>>> concurrent access to the StateStore from Interactive Query threads.
> >>>> Reads
> >>>>>>> from interactive queries need to be isolated from the currently
> >> ongoing
> >>>>>>> transaction, both for consistency (so interactive queries don't
> >> observe
> >>>>>>> changes that are subsequently rolled-back), but also to prevent
> >>>> Iterators
> >>>>>>> opened by an interactive query from being closed and invalidated by
> >> the
> >>>>>>> StreamThread when it commits the transaction, which causes your
> >>>>>> interactive
> >>>>>>> queries to crash.
> >>>>>>>
> >>>>>>> Another reason I believe I implemented it this way was a separation
> >> of
> >>>>>>> concerns. Recall that newTransaction() originally created an object
> >> of
> >>>>>> type
> >>>>>>> Transaction, not StateStore. My intent was to improve the
> type-safety
> >>>> of
> >>>>>>> the API, in an effort to ensure Transactions weren't used
> >> incorrectly.
> >>>>>>> Unfortunately, this didn't pan out, but newTransaction() remained.
> >>>>>>>
> >>>>>>> Finally, this had the added benefit that implementations could
> easily
> >>>> add
> >>>>>>> support for transactions *without* re-writing their existing,
> >>>>>>> non-transactional implementation. I think this can be a benefit
> both
> >>>> for
> >>>>>>> implementers of custom StateStores, but also for anyone extending
> >>>>>>> RocksDbStore, as they can rely on the existing access methods
> working
> >>>> how
> >>>>>>> they expect them to.
> >>>>>>>
> >>>>>>> I'm not too happy with the way the current design has panned out,
> so
> >>>> I'm
> >>>>>>> open to ideas on how to improve it. Key to this is finding some way
> >> to
> >>>>>>> ensure that reads from Interactive Query threads are properly
> >> isolated
> >>>>>> from
> >>>>>>> the transaction, *without* the performance overhead of checking
> which
> >>>>>>> thread the method is being called from on every access.
> >>>>>>>
> >>>>>>> As for replacing flush() with commit() - I saw no reason to add
> this
> >>>>>>> complexity to the KIP, unless there was a need to add arguments to
> >> the
> >>>>>>> flush/commit method. This need arises with Atomic Checkpointing,
> but
> >>>> that
> >>>>>>> will be implemented separately, in a future KIP. Do you see a need
> >> for
> >>>>>> some
> >>>>>>> arguments to the flush/commit method that I've missed? Or were you
> >>>> simply
> >>>>>>> suggesting a rename?
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> This is simply due to the practical reason that isolationLevel() is
> >>>>>> really
> >>>>>>> a proxy for checking if the app is under EOS. The application
> >>>>>> configuration
> >>>>>>> is not provided to the constructor of StateStores, but it *is*
> >> provided
> >>>>>> to
> >>>>>>> init(), via StateStoreContext. For this reason, it seemed somewhat
> >>>>>> natural
> >>>>>>> to add it to StateStoreContext. I think this makes sense, since the
> >>>>>>> IsolationLevel of all StateStores in an application *must* be the
> >> same,
> >>>>>> and
> >>>>>>> since those stores are all initialized with the same
> >> StateStoreContext,
> >>>>>> it
> >>>>>>> seems natural for that context to carry the desired IsolationLevel
> to
> >>>>>> use.
> >>>>>>>
> >>>>>>> 3.
> >>>>>>> Using IsolationLevel instead of just passing `boolean eosEnabled`,
> >> like
> >>>>>>> much of the internals was an attempt to logically de-couple the
> >>>>>> StateStore
> >>>>>>> API from the internals of Kafka Streams. Technically, StateStores
> >> don't
> >>>>>>> need to know/care what processing mode the KS app is using, all
> they
> >>>> need
> >>>>>>> to know is the isolation level expected of them.
> >>>>>>>
> >>>>>>> Having formal definitions for the expectations of the two required
> >>>>>>> IsolationLevels allow implementers to implement transactional
> stores
> >>>>>>> without having to dig through the internals of Kafka Streams and
> >>>>>> understand
> >>>>>>> exactly how they are used. The tight coupling between state stores
> >> and
> >>>>>>> internal behaviour has actually significantly hindered my progress
> on
> >>>>>> this
> >>>>>>> KIP, and encouraged me to avoid increasing this logical coupling as
> >>>> much
> >>>>>> as
> >>>>>>> possible.
> >>>>>>>
> >>>>>>> This also frees implementations to satisfy those requirements in
> any
> >>>> way
> >>>>>>> they choose. Transactions might not be the only/available approach
> to
> >>>> an
> >>>>>>> implementation, but they might have an alternative way to satisfy
> the
> >>>>>>> isolation requirements. I admit that this point is more about
> >>>> semantics,
> >>>>>>> but "transactional" would need to be formally defined in order for
> >>>>>>> implementers to provide a valid implementation, and these
> >>>> IsolationLevels
> >>>>>>> provide that formal definition.
> >>>>>>>
> >>>>>>> 4.
> >>>>>>> I can remove them. I added them only as I planned to include them
> in
> >>>> the
> >>>>>>> org.apache.kafka.streams.state package, as a recommended base
> >>>>>>> implementation for all StateStores, including those implemented by
> >>>>>> users. I
> >>>>>>> had assumed that anything in "public" packages, such as
> >>>>>>> org.apache.kafka.streams.state, should be included in a KIP. Is
> that
> >>>>>> wrong?
> >>>>>>>
> >>>>>>> 5.
> >>>>>>> RocksDB provides no way to measure the actual size of a
> >>>>>>> WriteBatch(WithIndex), so we're limited to tracking the sum total
> of
> >>>> the
> >>>>>>> size of keys + values that are written to the transaction. This
> >>>> obviously
> >>>>>>> under-estimates the actual memory usage, because WriteBatch
> no-doubt
> >>>>>>> includes some record overheads, and WriteBatchWithIndex has to
> >> maintain
> >>>>>> an
> >>>>>>> index.
> >>>>>>>
> >>>>>>> Ideally, we could trivially add a method upstream to
> >>>> WriteBatchInterface
> >>>>>>> that provides the exact size of the batch, but that would require
> an
> >>>>>>> upgrade of RocksDB, which won't happen soon. So for the time being,
> >>>> we're
> >>>>>>> stuck with an approximation, so I felt that the new method should
> >>>> reflect
> >>>>>>> that.
> >>>>>>>
> >>>>>>> Would you prefer the new method name ignores this constraint and
> that
> >>>> we
> >>>>>>> simply make the rocks measurement more accurate in the future?
> >>>>>>>
> >>>>>>> 6.
> >>>>>>> Done
> >>>>>>>
> >>>>>>> 7.
> >>>>>>> Very good point. The KIP already specifically calls out memory in
> the
> >>>>>>> documentation of the config: "Maximum number of memory bytes to be
> >> used
> >>>>>> to
> >>>>>>> buffer uncommitted state-store records." - did you have something
> >> else
> >>>> in
> >>>>>>> mind?
> >>>>>>>
> >>>>>>> Should we also make this clearer by renaming the config property
> >>>> itself?
> >>>>>>> Perhaps to something like statestore.transaction.buffer.max.bytes?
> >>>>>>>
> >>>>>>> 8.
> >>>>>>> OK, I can remove this. The intent here was to describe how Streams
> >>>> itself
> >>>>>>> will manage transaction roll-over etc. Presumably that means we
> also
> >>>>>> don't
> >>>>>>> need a description of how Streams will manage the commit of
> changelog
> >>>>>>> transactions, state store transactions and checkpointing?
> >>>>>>>
> >>>>>>> 9.
> >>>>>>> What do you mean by fail-over? Do you mean failing over an Active
> >> Task
> >>>> to
> >>>>>>> an instance already hosting a Standby Task?
> >>>>>>>
> >>>>>>> Thanks again and sorry for the essay of a response!
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Nick,
> >>>>>>>>
> >>>>>>>> Thanks for the updates!
> >>>>>>>>
> >>>>>>>> I really appreciate that you simplified the KIP by removing some
> >>>>>>>> aspects. As I have already told you, I think the removed aspects
> are
> >>>>>>>> also good ideas and we can discuss them on follow-up KIPs.
> >>>>>>>>
> >>>>>>>> Regarding the current KIP, I have the following feedback.
> >>>>>>>>
> >>>>>>>> 1.
> >>>>>>>> Is there a good reason to add method newTransaction() to the
> >>>> StateStore
> >>>>>>>> interface? As far as I understand, the idea is that users of a
> state
> >>>>>>>> store (transactional or not) call this method at start-up and
> after
> >>>> each
> >>>>>>>> commit. Since the call to newTransaction() is done in any case
> and I
> >>>>>>>> think it would simplify the caller code if we just start a new
> >>>>>>>> transaction after a commit in the implementation?
> >>>>>>>> As far as I understand, you plan to commit the transaction in the
> >>>>>>>> flush() method. I find the idea to replace flush() with commit()
> >>>>>>>> presented in KIP-844 an elegant solution.
> >>>>>>>>
> >>>>>>>> 2.
> >>>>>>>> Why is the method to query the isolation level added to the state
> >>>> store
> >>>>>>>> context?
> >>>>>>>>
> >>>>>>>> 3.
> >>>>>>>> Do we need all the isolation level definitions? I think it is good
> >> to
> >>>>>>>> know the guarantees of the transactionality of the state store.
> >>>>>>>> However, currently, Streams guarantees that there will only be one
> >>>>>>>> transaction that writes to the state store. Only the stream thread
> >>>> that
> >>>>>>>> executes the active task that owns the state store will write to
> the
> >>>>>>>> state store. I think it should be enough to know if the state
> store
> >> is
> >>>>>>>> transactional or not. So my proposal would be to just add a method
> >> on
> >>>>>>>> the state store interface the returns if a state store is
> >>>> transactional
> >>>>>>>> or not by returning a boolean or an enum.
> >>>>>>>>
> >>>>>>>> 4.
> >>>>>>>> I am wondering why AbstractTransaction and
> >> AbstractTransactionalStore
> >>>>>>>> are part of the KIP. They look like implementation details that
> >> should
> >>>>>>>> not be exposed in the public API.
> >>>>>>>>
> >>>>>>>> 5.
> >>>>>>>> Why does StateStore#approximateNumUncommittedBytes() return an
> >>>>>>>> approximate number of bytes?
> >>>>>>>>
> >>>>>>>> 6.
> >>>>>>>> RocksDB is just one implementation of the state stores in Streams.
> >>>>>>>> However, the issues regarding OOM errors might also apply to other
> >>>>>>>> custom implementations. So in the KIP I would extract that part
> from
> >>>>>>>> section "RocksDB Transaction". I would also move section "RocksDB
> >>>>>>>> Transaction" to the end of section "Proposed Changes" and handle
> it
> >> as
> >>>>>>>> an example implementation for a state store.
> >>>>>>>>
> >>>>>>>> 7.
> >>>>>>>> Should statestore.uncommitted.max.bytes only limit the uncommitted
> >>>> bytes
> >>>>>>>> or the uncommitted bytes that reside in memory? In future, other
> >>>>>>>> transactional state store implementations might implement a buffer
> >> for
> >>>>>>>> uncommitted records that are able to spill records on disk. I
> think
> >>>>>>>> statestore.uncommitted.max.bytes needs to limit the uncommitted
> >> bytes
> >>>>>>>> irrespective if they reside in memory or disk. Since Streams will
> >> use
> >>>>>>>> this config to decide if it needs to trigger a commit, state store
> >>>>>>>> implementations that can spill to disk will never be able to spill
> >> to
> >>>>>>>> disk. You would only need to change the doc of the config, if you
> >>>> agree
> >>>>>>>> with me.
> >>>>>>>>
> >>>>>>>> 8.
> >>>>>>>> Section "Transaction Management" about the wrappers is rather a
> >>>>>>>> implementation detail that should not be in the KIP.
> >>>>>>>>
> >>>>>>>> 9.
> >>>>>>>> Could you add a section that describes how failover will work with
> >> the
> >>>>>>>> transactional state stores? I think section "Error handling" is
> >>>> already
> >>>>>>>> a good start.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 15.05.23 11:04, Nick Telford wrote:
> >>>>>>>>> Hi everyone,
> >>>>>>>>>
> >>>>>>>>> Quick update: I've added a new section to the KIP: "Offsets for
> >>>>>> Consumer
> >>>>>>>>> Rebalances", that outlines my solution to the problem that
> >>>>>>>>> StreamsPartitionAssignor needs to read StateStore offsets even if
> >>>>>> they're
> >>>>>>>>> not currently open.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>> On Wed, 3 May 2023 at 11:34, Nick Telford <
> nick.telf...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Bruno,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for reviewing my proposal.
> >>>>>>>>>>
> >>>>>>>>>> 1.
> >>>>>>>>>> The main reason I added it was because it was easy to do. If we
> >> see
> >>>> no
> >>>>>>>>>> value in it, I can remove it.
> >>>>>>>>>>
> >>>>>>>>>> 2.
> >>>>>>>>>> Global StateStores can have multiple partitions in their input
> >>>> topics
> >>>>>>>>>> (which function as their changelogs), so they would have more
> than
> >>>> one
> >>>>>>>>>> partition.
> >>>>>>>>>>
> >>>>>>>>>> 3.
> >>>>>>>>>> That's a good point. At present, the only method it adds is
> >>>>>>>>>> isolationLevel(), which is likely not necessary outside of
> >>>>>> StateStores.
> >>>>>>>>>> It *does* provide slightly different guarantees in the
> >> documentation
> >>>>>> to
> >>>>>>>>>> several of the methods (hence the overrides). I'm not sure if
> this
> >>>> is
> >>>>>>>>>> enough to warrant a new interface though.
> >>>>>>>>>> I think the question that remains is whether this interface
> makes
> >> it
> >>>>>>>>>> easier to implement custom transactional StateStores than if we
> >> were
> >>>>>> to
> >>>>>>>>>> remove it? Probably not.
> >>>>>>>>>>
> >>>>>>>>>> 4.
> >>>>>>>>>> The main motivation for the Atomic Checkpointing is actually
> >>>>>>>> performance.
> >>>>>>>>>> My team has been testing out an implementation of this KIP
> without
> >>>> it,
> >>>>>>>> and
> >>>>>>>>>> we had problems with RocksDB doing *much* more compaction, due
> to
> >>>> the
> >>>>>>>>>> significantly increased flush rate. It was enough of a problem
> >> that
> >>>>>> (for
> >>>>>>>>>> the time being), we had to revert back to Kafka Streams proper.
> >>>>>>>>>> I think the best way to solve this, as you say, is to keep the
> >>>>>>>> .checkpoint
> >>>>>>>>>> files *in addition* to the offsets being stored within the store
> >>>>>> itself.
> >>>>>>>>>> Essentially, when closing StateStores, we force a memtable
> flush,
> >>>> then
> >>>>>>>>>> call getCommittedOffsets and write those out to the .checkpoint
> >>>> file.
> >>>>>>>>>> That would ensure the metadata is available to the
> >>>>>>>>>> StreamsPartitionAssignor for all closed stores.
> >>>>>>>>>> If there's a crash (no clean close), then we won't be able to
> >>>>>> guarantee
> >>>>>>>>>> which offsets were flushed to disk by RocksDB, so we'd need to
> >> open
> >>>> (
> >>>>>>>>>> init()), read offsets, and then close() those stores. But since
> >> this
> >>>>>> is
> >>>>>>>>>> the exception, and will only occur once (provided it doesn't
> crash
> >>>>>> every
> >>>>>>>>>> time!), I think the performance impact here would be acceptable.
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the feedback, please let me know if you have any more
> >>>>>>>> comments
> >>>>>>>>>> or questions!
> >>>>>>>>>>
> >>>>>>>>>> I'm currently working on rebasing against trunk. This involves
> >>>> adding
> >>>>>>>>>> support for transactionality to VersionedStateStores. I will
> >>>> probably
> >>>>>>>> need
> >>>>>>>>>> to revise my implementation for transactional "segmented"
> stores,
> >>>> both
> >>>>>>>> to
> >>>>>>>>>> accommodate VersionedStateStore, and to clean up some other
> stuff.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Nick
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the updates!
> >>>>>>>>>>>
> >>>>>>>>>>> I have a couple of questions/comments.
> >>>>>>>>>>>
> >>>>>>>>>>> 1.
> >>>>>>>>>>> Why do you propose a configuration that involves max. bytes and
> >>>> max.
> >>>>>>>>>>> reords? I think we are mainly concerned about memory
> consumption
> >>>>>>>> because
> >>>>>>>>>>> we want to limit the off-heap memory used. I cannot think of a
> >> case
> >>>>>>>>>>> where one would want to set the max. number of records.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> Why does
> >>>>>>>>>>>
> >>>>>>>>>>>        default void commit(final Map<TopicPartition, Long>
> >>>>>>>> changelogOffsets) {
> >>>>>>>>>>>            flush();
> >>>>>>>>>>>        }
> >>>>>>>>>>>
> >>>>>>>>>>> take a map of partitions to changelog offsets?
> >>>>>>>>>>> The mapping between state stores to partitions is a 1:1
> >>>> relationship.
> >>>>>>>>>>> Passing in a single changelog offset should suffice.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 3.
> >>>>>>>>>>> Why do we need the Transaction interface? It should be possible
> >> to
> >>>>>> hide
> >>>>>>>>>>> beginning and committing a transactions withing the state store
> >>>>>>>>>>> implementation, so that from outside the state store, it does
> not
> >>>>>>>> matter
> >>>>>>>>>>> whether the state store is transactional or not. What would be
> >> the
> >>>>>>>>>>> advantage of using the Transaction interface?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 4.
> >>>>>>>>>>> Regarding checkpointing offsets, I think we should keep the
> >>>>>> checkpoint
> >>>>>>>>>>> file in any case for the reason you mentioned about
> rebalancing.
> >>>> Even
> >>>>>>>> if
> >>>>>>>>>>> that would not be an issue, I would propose to move the change
> to
> >>>>>>>> offset
> >>>>>>>>>>> management to a new KIP and to not add more complexity than
> >> needed
> >>>> to
> >>>>>>>>>>> this one. I would not be too concerned about the consistency
> >>>>>> violation
> >>>>>>>>>>> you mention. As far as I understand, with transactional state
> >>>> stores
> >>>>>>>>>>> Streams would write the checkpoint file during every commit
> even
> >>>>>> under
> >>>>>>>>>>> EOS. In the failure case you describe, Streams would restore
> the
> >>>>>> state
> >>>>>>>>>>> stores from the offsets found in the checkpoint file written
> >> during
> >>>>>> the
> >>>>>>>>>>> penultimate commit instead of during the last commit.
> Basically,
> >>>>>>>> Streams
> >>>>>>>>>>> would overwrite the records written to the state store between
> >> the
> >>>>>> last
> >>>>>>>>>>> two commits with the same records read from the changelogs.
> >> While I
> >>>>>>>>>>> understand that this is wasteful, it is -- at the same time --
> >>>>>>>>>>> acceptable and most importantly it does not break EOS.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 27.04.23 12:34, Nick Telford wrote:
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I find myself (again) considering removing the offset
> management
> >>>>>> from
> >>>>>>>>>>>> StateStores, and keeping the old checkpoint file system. The
> >>>> reason
> >>>>>> is
> >>>>>>>>>>> that
> >>>>>>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in
> >>>> order
> >>>>>>>> to
> >>>>>>>>>>>> determine which instance has the most up-to-date copy of the
> >> local
> >>>>>>>>>>> state.
> >>>>>>>>>>>> If we move offsets into the StateStore itself, then we will
> need
> >>>> to
> >>>>>>>>>>> open,
> >>>>>>>>>>>> initialize, read offsets and then close each StateStore (that
> is
> >>>> not
> >>>>>>>>>>>> already assigned and open) for which we have *any* local
> state,
> >> on
> >>>>>>>> every
> >>>>>>>>>>>> rebalance.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Generally, I don't think there are many "orphan" stores like
> >> this
> >>>>>>>>>>> sitting
> >>>>>>>>>>>> around on most instances, but even a few would introduce
> >>>> additional
> >>>>>>>>>>> latency
> >>>>>>>>>>>> to an already somewhat lengthy rebalance procedure.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping
> >>>> things
> >>>>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>>> checkpoint file(s) for now, and not worrying about the race.
> The
> >>>>>>>>>>> downside
> >>>>>>>>>>>> is that we wouldn't be able to remove the explicit RocksDB
> flush
> >>>>>>>>>>> on-commit,
> >>>>>>>>>>>> which likely hurts performance.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If anyone has any thoughts or ideas on this subject, I would
> >>>>>>>> appreciate
> >>>>>>>>>>> it!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Nick
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <
> >>>> nick.telf...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The issue is that if there's a crash between 2 and 3, then
> you
> >>>>>> still
> >>>>>>>>>>> end
> >>>>>>>>>>>>> up with inconsistent data in RocksDB. The only way to
> guarantee
> >>>>>> that
> >>>>>>>>>>> your
> >>>>>>>>>>>>> checkpoint offsets and locally stored data are consistent
> with
> >>>> each
> >>>>>>>>>>> other
> >>>>>>>>>>>>> are to atomically commit them, which can be achieved by
> having
> >>>> the
> >>>>>>>>>>> offsets
> >>>>>>>>>>>>> stored in RocksDB.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The offsets column family is likely to be extremely small
> (one
> >>>>>>>>>>>>> per-changelog partition + one per Topology input partition
> for
> >>>>>>>> regular
> >>>>>>>>>>>>> stores, one per input partition for global stores). So the
> >>>> overhead
> >>>>>>>>>>> will be
> >>>>>>>>>>>>> minimal.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> A major benefit of doing this is that we can remove the
> >> explicit
> >>>>>>>> calls
> >>>>>>>>>>> to
> >>>>>>>>>>>>> db.flush(), which forcibly flushes memtables to disk
> on-commit.
> >>>> It
> >>>>>>>>>>> turns
> >>>>>>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka
> >>>> Streams
> >>>>>>>>>>>>> commits, *not* RocksDB configuration, which could be a major
> >>>> source
> >>>>>>>> of
> >>>>>>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these
> >>>>>>>> explicit
> >>>>>>>>>>>>> flushes, because it no longer matters exactly when RocksDB
> >>>> flushes
> >>>>>>>>>>> data to
> >>>>>>>>>>>>> disk; since the data and corresponding checkpoint offsets
> will
> >>>>>> always
> >>>>>>>>>>> be
> >>>>>>>>>>>>> flushed together, the local store is always in a consistent
> >>>> state,
> >>>>>>>> and
> >>>>>>>>>>>>> on-restart, it can always safely resume restoration from the
> >>>>>> on-disk
> >>>>>>>>>>>>> offsets, restoring the small amount of data that hadn't been
> >>>>>> flushed
> >>>>>>>>>>> when
> >>>>>>>>>>>>> the app exited/crashed.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <
> >> c...@littlehorse.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for your reply. Ack to A) and B).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For item C), I see what you're referring to. Your proposed
> >>>>>> solution
> >>>>>>>>>>> will
> >>>>>>>>>>>>>> work, so no need to change it. What I was suggesting was
> that
> >> it
> >>>>>>>>>>> might be
> >>>>>>>>>>>>>> possible to achieve this with only one column family. So
> long
> >>>> as:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>          - No uncommitted records (i.e. not committed to the
> >>>>>> changelog)
> >>>>>>>> are
> >>>>>>>>>>>>>>          *committed* to the state store, AND
> >>>>>>>>>>>>>>          - The Checkpoint offset (which refers to the
> changelog
> >>>>>> topic)
> >>>>>>>> is
> >>>>>>>>>>> less
> >>>>>>>>>>>>>>          than or equal to the last written changelog offset
> in
> >>>>>> rocksdb
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I don't see the need to do the full restoration from
> scratch.
> >> My
> >>>>>>>>>>>>>> understanding was that prior to 844/892, full restorations
> >> were
> >>>>>>>>>>> required
> >>>>>>>>>>>>>> because there could be uncommitted records written to
> RocksDB;
> >>>>>>>>>>> however,
> >>>>>>>>>>>>>> given your use of RocksDB transactions, that can be avoided
> >> with
> >>>>>> the
> >>>>>>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB
> >>>>>>>>>>> transaction, 3)
> >>>>>>>>>>>>>> update offset in checkpoint file.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Anyways, your proposed solution works equivalently and I
> don't
> >>>>>>>> believe
> >>>>>>>>>>>>>> there is much overhead to an additional column family in
> >>>> RocksDB.
> >>>>>>>>>>> Perhaps
> >>>>>>>>>>>>>> it may even perform better than making separate writes to
> the
> >>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>> file.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <
> >>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from
> >> the
> >>>>>>>> rest
> >>>>>>>>>>> of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> Streams engine. The fact that there will be only one
> ongoing
> >>>>>>>> (write)
> >>>>>>>>>>>>>>> transaction at a time is not guaranteed by any API, and is
> >>>> just a
> >>>>>>>>>>>>>>> consequence of the way Streams operates. To that end, I
> tried
> >>>> to
> >>>>>>>>>>> ensure
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> documentation and guarantees provided by the new APIs are
> >>>>>>>>>>> independent of
> >>>>>>>>>>>>>>> this incidental behaviour. In practice, you're right, this
> >>>>>>>>>>> essentially
> >>>>>>>>>>>>>>> refers to "interactive queries", which are technically
> "read
> >>>>>>>>>>>>>> transactions",
> >>>>>>>>>>>>>>> even if they don't actually use the transaction API to
> >> isolate
> >>>>>>>>>>>>>> themselves.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> B. Yes, although not ideal. This is for backwards
> >>>> compatibility,
> >>>>>>>>>>>>>> because:
> >>>>>>>>>>>>>>>           1) Existing custom StateStore implementations
> will
> >>>>>> implement
> >>>>>>>>>>>>>> flush(),
> >>>>>>>>>>>>>>> and not commit(), but the Streams engine now calls
> commit(),
> >> so
> >>>>>>>> those
> >>>>>>>>>>>>>> calls
> >>>>>>>>>>>>>>> need to be forwarded to flush() for these legacy stores.
> >>>>>>>>>>>>>>>           2) Existing StateStore *users*, i.e. outside of
> the
> >>>>>> Streams
> >>>>>>>>>>> engine
> >>>>>>>>>>>>>>> itself, may depend on explicitly calling flush(), so for
> >> these
> >>>>>>>> cases,
> >>>>>>>>>>>>>>> flush() needs to be redirected to call commit().
> >>>>>>>>>>>>>>> If anyone has a better way to guarantee compatibility
> without
> >>>>>>>>>>>>>> introducing
> >>>>>>>>>>>>>>> this potential recursion loop, I'm open to changes!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section.
> >>>>>> Offsets
> >>>>>>>>>>> are
> >>>>>>>>>>>>>>> stored in a separate RocksDB column family, which is
> >> guaranteed
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>>>>>>> atomically flushed to disk with all other column families.
> >> The
> >>>>>>>> issue
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>> checkpoints being written to disk after commit causing
> >>>>>>>> inconsistency
> >>>>>>>>>>> if
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint
> >>>> files
> >>>>>>>> are
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>> written on clean shutdown. This is one of the major causes
> of
> >>>>>> "full
> >>>>>>>>>>>>>>> restorations", so moving the offsets into a place where
> they
> >>>> can
> >>>>>> be
> >>>>>>>>>>>>>>> guaranteed to be atomically written with the data they
> >>>> checkpoint
> >>>>>>>>>>>>>> allows us
> >>>>>>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just
> >> on
> >>>>>>>> clean
> >>>>>>>>>>>>>>> shutdown.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <
> >>>> c...@littlehorse.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Thank you for continuing this work. I have a few minor
> >>>>>> clarifying
> >>>>>>>>>>>>>>>> questions.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> A) "Records written to any transaction are visible to all
> >>>> other
> >>>>>>>>>>>>>>>> transactions immediately." I am confused here—I thought
> >> there
> >>>>>>>> could
> >>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> one transaction going on at a time for a given state store
> >>>> given
> >>>>>>>> the
> >>>>>>>>>>>>>>>> threading model for processing records on a Task. Do you
> >> mean
> >>>>>>>>>>>>>> Interactive
> >>>>>>>>>>>>>>>> Queries by "other transactions"? (If so, then everything
> >> makes
> >>>>>>>>>>> sense—I
> >>>>>>>>>>>>>>>> thought that since IQ were read-only then they didn't
> count
> >> as
> >>>>>>>>>>>>>>>> transactions).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> B) Is it intentional that the default implementations of
> the
> >>>>>>>> flush()
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> commit() methods in the StateStore class refer to each
> other
> >>>> in
> >>>>>>>> some
> >>>>>>>>>>>>>> sort
> >>>>>>>>>>>>>>>> of unbounded recursion?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> C) How will the getCommittedOffset() method work? At
> first I
> >>>>>>>> thought
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> way to do it would be using a special key in the RocksDB
> >> store
> >>>>>> to
> >>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> offset, and committing that with the transaction. But upon
> >>>>>> second
> >>>>>>>>>>>>>>> thought,
> >>>>>>>>>>>>>>>> since restoration from the changelog is an idempotent
> >>>>>> procedure, I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and
> then
> >> 2)
> >>>>>>>> write
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash
> >>>> between
> >>>>>>>> 1)
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>> 2),
> >>>>>>>>>>>>>>>> I think the only downside is now we replay a few more
> >> records
> >>>>>> (at
> >>>>>>>> a
> >>>>>>>>>>>>>> cost
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> <100ms). Am I missing something there?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Other than that, everything makes sense to me.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
> >>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the
> >>>>>> design:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> There are several changes in there that reflect feedback
> >> from
> >>>>>>>> this
> >>>>>>>>>>>>>>>> thread,
> >>>>>>>>>>>>>>>>> and there's a new section and a bunch of interface
> changes
> >>>>>>>> relating
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the
> >> puzzle
> >>>> to
> >>>>>>>>>>>>>> making
> >>>>>>>>>>>>>>>>> everything robust.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let me know what you think!
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <
> >>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Lucas,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks for looking over my KIP.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a
> >> typo
> >>>> in
> >>>>>>>> the
> >>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>> that I've now corrected. It was originally per-Task,
> but I
> >>>>>>>>>>>>>> changed it
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> per-instance for exactly the reason you highlighted.
> >>>>>>>>>>>>>>>>>> B) It's worth noting that transactionality is only
> enabled
> >>>>>> under
> >>>>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be
> >> no
> >>>>>>>>>>>>>> change in
> >>>>>>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the
> >>>>>> impact
> >>>>>>>> on
> >>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> sufficiently low default values for the memory bound
> >>>>>>>>>>>>>> configuration. I
> >>>>>>>>>>>>>>>>>> understand your hesitation to include a significant
> change
> >>>> of
> >>>>>>>>>>>>>>>> behaviour,
> >>>>>>>>>>>>>>>>>> especially in a minor release, but I suspect that most
> >> users
> >>>>>>>> will
> >>>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour
> of
> >>>>>>>>>>>>>> frequent
> >>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait
> >>>> until
> >>>>>>>> the
> >>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>> major release. I'll be running a patched version of
> >> streams
> >>>> in
> >>>>>>>>>>>>>>>> production
> >>>>>>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't
> >>>>>> disrupt
> >>>>>>>>>>>>>> me
> >>>>>>>>>>>>>>> :-D
> >>>>>>>>>>>>>>>>>> C) The main purpose of this sentence was just to note
> that
> >>>>>> some
> >>>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>> will need to be made to the way Segments are handled in
> >>>> order
> >>>>>> to
> >>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote
> >>>> it, I
> >>>>>>>>>>>>>> hadn't
> >>>>>>>>>>>>>>>>>> figured out the specific changes necessary, so it was
> >>>>>>>> deliberately
> >>>>>>>>>>>>>>>> vague.
> >>>>>>>>>>>>>>>>>> This is the one outstanding problem I'm currently
> working
> >>>> on,
> >>>>>>>> and
> >>>>>>>>>>>>>>> I'll
> >>>>>>>>>>>>>>>>>> update this section with more detail once I have figured
> >> out
> >>>>>> the
> >>>>>>>>>>>>>>> exact
> >>>>>>>>>>>>>>>>>> changes required.
> >>>>>>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation
> >>>>>> guarantees.
> >>>>>>>>>>>>>>> While
> >>>>>>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't
> >>>> technically
> >>>>>>>>>>>>>> *need*
> >>>>>>>>>>>>>>>>>> read-only users to call newTransaction(), other
> >>>>>> implementations
> >>>>>>>>>>>>>>> (e.g. a
> >>>>>>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling
> >>>>>>>>>>>>>> newTransaction()
> >>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>> no transaction is necessary is essentially free, as it
> >> will
> >>>>>> just
> >>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I
> think
> >> it
> >>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> fairly obvious where the performance problems stem from:
> >>>>>> writes
> >>>>>>>>>>>>>> under
> >>>>>>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it
> with
> >>>> the
> >>>>>>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the
> >>>>>> tombstone/record
> >>>>>>>>>>>>>> flag,
> >>>>>>>>>>>>>>>>> and 1
> >>>>>>>>>>>>>>>>>> to copy the record from the "temporary" store to the
> >> "main"
> >>>>>>>> store,
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> transaction commits. The different approach taken by
> >> KIP-869
> >>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> perform
> >>>>>>>>>>>>>>>>>> much better, as it avoids all these copies, and may
> >> actually
> >>>>>>>>>>>>>> perform
> >>>>>>>>>>>>>>>>>> slightly better than trunk, due to batched writes in
> >> RocksDB
> >>>>>>>>>>>>>>> performing
> >>>>>>>>>>>>>>>>>> better than non-batched writes.[1]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>
> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
> >>>>>>>>>>>>>> lbruts...@confluent.io
> >>>>>>>>>>>>>>>>> .invalid>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion
> >> about
> >>>>>>>>>>>>>> KIP-892
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think
> >>>>>>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do
> >> have
> >>>>>> some
> >>>>>>>>>>>>>>>>>>> questions about the latest draft.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>        A) If I understand correctly, you propose to
> put a
> >>>> bound
> >>>>>> on
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> (native) memory consumed by each task. However, I
> wonder
> >> if
> >>>>>>>> this
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the
> >> cluster.
> >>>>>> For
> >>>>>>>>>>>>>>>>>>> example, depending on the timing of rebalances during a
> >>>>>> cluster
> >>>>>>>>>>>>>>>>>>> restart, it could happen that a single streams node is
> >>>>>>>> assigned a
> >>>>>>>>>>>>>>> lot
> >>>>>>>>>>>>>>>>>>> more tasks than expected. With your proposed change,
> this
> >>>>>> would
> >>>>>>>>>>>>>> mean
> >>>>>>>>>>>>>>>>>>> that the memory required by this one node could be a
> >>>> multiple
> >>>>>>>> of
> >>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>> is required during normal operation. I wonder if it
> >>>> wouldn't
> >>>>>> be
> >>>>>>>>>>>>>>> safer
> >>>>>>>>>>>>>>>>>>> to put a global bound on the memory use, across all
> >> tasks.
> >>>>>>>>>>>>>>>>>>>        B) Generally, the memory concerns still give me
> the
> >>>>>> feeling
> >>>>>>>>>>> that
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> should not be enabled by default for all users in a
> minor
> >>>>>>>>>>>>>> release.
> >>>>>>>>>>>>>>>>>>>        C) In section "Transaction Management": the
> >> sentence
> >>>> "A
> >>>>>>>> similar
> >>>>>>>>>>>>>>>>>>> analogue will be created to automatically manage
> >> `Segment`
> >>>>>>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some
> >>>>>> background,
> >>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>>> do not understand this, it would be great if you could
> >>>>>> clarify
> >>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>>>> you mean here.
> >>>>>>>>>>>>>>>>>>>        D) Could you please clarify why IQ has to call
> >>>>>>>>>>> newTransaction(),
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> it's read-only.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if
> >>>> there
> >>>>>>>> is
> >>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x
> >>>>>> slower
> >>>>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>> by providing a flame graph), that would be quite
> >>>> interesting.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
> >>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design,
> which
> >>>>>>>>>>>>>> reflects
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> implementation I've been working on:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> This new design should address the outstanding points
> >>>>>> already
> >>>>>>>>>>>>>> made
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear
> >> or
> >>>>>> need
> >>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>> clarification.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm
> confident
> >>>> that
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> remaining
> >>>>>>>>>>>>>>>>>>>> work (making Segments behave) will not impact the
> >>>> documented
> >>>>>>>>>>>>>>> design.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <
> >>>>>>>> c...@littlehorse.io
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was
> hoping
> >>>>>> that,
> >>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the
> >>>>>>>>>>>>>> documented
> >>>>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major
> release
> >> to
> >>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> change;
> >>>>>>>>>>>>>>>>>>>>> but given that it would be considered a major
> change, I
> >>>>>> like
> >>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding!
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
> >>>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally
> intended
> >>>> to
> >>>>>>>>>>>>>> try
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> keep the
> >>>>>>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise
> >> we'd
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> wait for
> >>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>> major version release to land these changes.
> >>>>>>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of
> >>>>>>>>>>>>>> guarantee,
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> typically longer commit intervals would be
> problematic
> >>>>>> when
> >>>>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>> "committed" records.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor
> >> surgery,
> >>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>>> spent a
> >>>>>>>>>>>>>>>>>>>>>> considerable amount of that time working through
> ideas
> >>>> for
> >>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of
> >> keeping
> >>>>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>>>> as-is, but
> >>>>>>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards,
> >>>>>>>>>>>>>> although
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for
> >>>> some
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> formal
> >>>>>>>>>>>>>>>>>>>>>> guarantees.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for
> >>>>>>>>>>>>>>>> "Transactions",
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read
> >> Committed"
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> under
> >>>>>>>>>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives
> us
> >>>>>> much
> >>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>> clearly
> >>>>>>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly
> >>>>>>>>>>>>>> documented to
> >>>>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>>>>>>>> know what to expect.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and
> >> will
> >>>>>>>>>>>>>> update
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to
> >>>> implement
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces
> or
> >>>>>>>>>>>>>>> breaking
> >>>>>>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>>>> implementations, because currently everything
> expects
> >> to
> >>>>>>>>>>>>>>> operate
> >>>>>>>>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that
> store.
> >> I
> >>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>> getting
> >>>>>>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress
> much
> >>>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>> next
> >>>>>>>>>>>>>>>>>>> week
> >>>>>>>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>>>>>> to some work commitments.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
> >>>>>>>>>>>>>>> c...@littlehorse.io>
> >>>>>

Reply via email to