Hi John,

By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any
class called "RecordCache"...

Cheers,

Nick

On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> wrote:

> Hi Nick,
>
> Thanks for picking this up again!
>
> I did have one new thought over the intervening months, which I'd like
> your take on.
>
> What if, instead of using the RocksDB atomic write primitive at all, we
> instead just:
> 1. disable memtables entirely
> 2. directly write the RecordCache into SST files when we flush
> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK
> from the changelog (see
>
> https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
> and
>
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
> and
>
> https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
> )
> 4. track the changelog offsets either in another CF or the same CF with
> a reserved key, either of which will make the changelog offset update
> atomic with the file ingestions
>
> I suspect this'll have a number of benefits:
> * writes to RocksDB will always be atomic
> * we don't fragment memory between the RecordCache and the memtables
> * RecordCache gives far higher performance than memtable for reads and
> writes
> * we don't need any new "transaction" concepts or memory bound configs
>
> What do you think?
>
> Thanks,
> -John
>
> On 6/20/23 10:51, Nick Telford wrote:
> > Hi Bruno,
> >
> > Thanks for reviewing the KIP. It's been a long road, I started working on
> > this more than a year ago, and most of the time in the last 6 months has
> > been spent on the "Atomic Checkpointing" stuff that's been benched, so
> some
> > of the reasoning behind some of my decisions have been lost, but I'll do
> my
> > best to reconstruct them.
> >
> > 1.
> > IIRC, this was the initial approach I tried. I don't remember the exact
> > reasons I changed it to use a separate "view" of the StateStore that
> > encapsulates the transaction, but I believe it had something to do with
> > concurrent access to the StateStore from Interactive Query threads. Reads
> > from interactive queries need to be isolated from the currently ongoing
> > transaction, both for consistency (so interactive queries don't observe
> > changes that are subsequently rolled-back), but also to prevent Iterators
> > opened by an interactive query from being closed and invalidated by the
> > StreamThread when it commits the transaction, which causes your
> interactive
> > queries to crash.
> >
> > Another reason I believe I implemented it this way was a separation of
> > concerns. Recall that newTransaction() originally created an object of
> type
> > Transaction, not StateStore. My intent was to improve the type-safety of
> > the API, in an effort to ensure Transactions weren't used incorrectly.
> > Unfortunately, this didn't pan out, but newTransaction() remained.
> >
> > Finally, this had the added benefit that implementations could easily add
> > support for transactions *without* re-writing their existing,
> > non-transactional implementation. I think this can be a benefit both for
> > implementers of custom StateStores, but also for anyone extending
> > RocksDbStore, as they can rely on the existing access methods working how
> > they expect them to.
> >
> > I'm not too happy with the way the current design has panned out, so I'm
> > open to ideas on how to improve it. Key to this is finding some way to
> > ensure that reads from Interactive Query threads are properly isolated
> from
> > the transaction, *without* the performance overhead of checking which
> > thread the method is being called from on every access.
> >
> > As for replacing flush() with commit() - I saw no reason to add this
> > complexity to the KIP, unless there was a need to add arguments to the
> > flush/commit method. This need arises with Atomic Checkpointing, but that
> > will be implemented separately, in a future KIP. Do you see a need for
> some
> > arguments to the flush/commit method that I've missed? Or were you simply
> > suggesting a rename?
> >
> > 2.
> > This is simply due to the practical reason that isolationLevel() is
> really
> > a proxy for checking if the app is under EOS. The application
> configuration
> > is not provided to the constructor of StateStores, but it *is* provided
> to
> > init(), via StateStoreContext. For this reason, it seemed somewhat
> natural
> > to add it to StateStoreContext. I think this makes sense, since the
> > IsolationLevel of all StateStores in an application *must* be the same,
> and
> > since those stores are all initialized with the same StateStoreContext,
> it
> > seems natural for that context to carry the desired IsolationLevel to
> use.
> >
> > 3.
> > Using IsolationLevel instead of just passing `boolean eosEnabled`, like
> > much of the internals was an attempt to logically de-couple the
> StateStore
> > API from the internals of Kafka Streams. Technically, StateStores don't
> > need to know/care what processing mode the KS app is using, all they need
> > to know is the isolation level expected of them.
> >
> > Having formal definitions for the expectations of the two required
> > IsolationLevels allow implementers to implement transactional stores
> > without having to dig through the internals of Kafka Streams and
> understand
> > exactly how they are used. The tight coupling between state stores and
> > internal behaviour has actually significantly hindered my progress on
> this
> > KIP, and encouraged me to avoid increasing this logical coupling as much
> as
> > possible.
> >
> > This also frees implementations to satisfy those requirements in any way
> > they choose. Transactions might not be the only/available approach to an
> > implementation, but they might have an alternative way to satisfy the
> > isolation requirements. I admit that this point is more about semantics,
> > but "transactional" would need to be formally defined in order for
> > implementers to provide a valid implementation, and these IsolationLevels
> > provide that formal definition.
> >
> > 4.
> > I can remove them. I added them only as I planned to include them in the
> > org.apache.kafka.streams.state package, as a recommended base
> > implementation for all StateStores, including those implemented by
> users. I
> > had assumed that anything in "public" packages, such as
> > org.apache.kafka.streams.state, should be included in a KIP. Is that
> wrong?
> >
> > 5.
> > RocksDB provides no way to measure the actual size of a
> > WriteBatch(WithIndex), so we're limited to tracking the sum total of the
> > size of keys + values that are written to the transaction. This obviously
> > under-estimates the actual memory usage, because WriteBatch no-doubt
> > includes some record overheads, and WriteBatchWithIndex has to maintain
> an
> > index.
> >
> > Ideally, we could trivially add a method upstream to WriteBatchInterface
> > that provides the exact size of the batch, but that would require an
> > upgrade of RocksDB, which won't happen soon. So for the time being, we're
> > stuck with an approximation, so I felt that the new method should reflect
> > that.
> >
> > Would you prefer the new method name ignores this constraint and that we
> > simply make the rocks measurement more accurate in the future?
> >
> > 6.
> > Done
> >
> > 7.
> > Very good point. The KIP already specifically calls out memory in the
> > documentation of the config: "Maximum number of memory bytes to be used
> to
> > buffer uncommitted state-store records." - did you have something else in
> > mind?
> >
> > Should we also make this clearer by renaming the config property itself?
> > Perhaps to something like statestore.transaction.buffer.max.bytes?
> >
> > 8.
> > OK, I can remove this. The intent here was to describe how Streams itself
> > will manage transaction roll-over etc. Presumably that means we also
> don't
> > need a description of how Streams will manage the commit of changelog
> > transactions, state store transactions and checkpointing?
> >
> > 9.
> > What do you mean by fail-over? Do you mean failing over an Active Task to
> > an instance already hosting a Standby Task?
> >
> > Thanks again and sorry for the essay of a response!
> >
> > Regards,
> > Nick
> >
> > On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> wrote:
> >
> >> Hi Nick,
> >>
> >> Thanks for the updates!
> >>
> >> I really appreciate that you simplified the KIP by removing some
> >> aspects. As I have already told you, I think the removed aspects are
> >> also good ideas and we can discuss them on follow-up KIPs.
> >>
> >> Regarding the current KIP, I have the following feedback.
> >>
> >> 1.
> >> Is there a good reason to add method newTransaction() to the StateStore
> >> interface? As far as I understand, the idea is that users of a state
> >> store (transactional or not) call this method at start-up and after each
> >> commit. Since the call to newTransaction() is done in any case and I
> >> think it would simplify the caller code if we just start a new
> >> transaction after a commit in the implementation?
> >> As far as I understand, you plan to commit the transaction in the
> >> flush() method. I find the idea to replace flush() with commit()
> >> presented in KIP-844 an elegant solution.
> >>
> >> 2.
> >> Why is the method to query the isolation level added to the state store
> >> context?
> >>
> >> 3.
> >> Do we need all the isolation level definitions? I think it is good to
> >> know the guarantees of the transactionality of the state store.
> >> However, currently, Streams guarantees that there will only be one
> >> transaction that writes to the state store. Only the stream thread that
> >> executes the active task that owns the state store will write to the
> >> state store. I think it should be enough to know if the state store is
> >> transactional or not. So my proposal would be to just add a method on
> >> the state store interface the returns if a state store is transactional
> >> or not by returning a boolean or an enum.
> >>
> >> 4.
> >> I am wondering why AbstractTransaction and AbstractTransactionalStore
> >> are part of the KIP. They look like implementation details that should
> >> not be exposed in the public API.
> >>
> >> 5.
> >> Why does StateStore#approximateNumUncommittedBytes() return an
> >> approximate number of bytes?
> >>
> >> 6.
> >> RocksDB is just one implementation of the state stores in Streams.
> >> However, the issues regarding OOM errors might also apply to other
> >> custom implementations. So in the KIP I would extract that part from
> >> section "RocksDB Transaction". I would also move section "RocksDB
> >> Transaction" to the end of section "Proposed Changes" and handle it as
> >> an example implementation for a state store.
> >>
> >> 7.
> >> Should statestore.uncommitted.max.bytes only limit the uncommitted bytes
> >> or the uncommitted bytes that reside in memory? In future, other
> >> transactional state store implementations might implement a buffer for
> >> uncommitted records that are able to spill records on disk. I think
> >> statestore.uncommitted.max.bytes needs to limit the uncommitted bytes
> >> irrespective if they reside in memory or disk. Since Streams will use
> >> this config to decide if it needs to trigger a commit, state store
> >> implementations that can spill to disk will never be able to spill to
> >> disk. You would only need to change the doc of the config, if you agree
> >> with me.
> >>
> >> 8.
> >> Section "Transaction Management" about the wrappers is rather a
> >> implementation detail that should not be in the KIP.
> >>
> >> 9.
> >> Could you add a section that describes how failover will work with the
> >> transactional state stores? I think section "Error handling" is already
> >> a good start.
> >>
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >>
> >>
> >> On 15.05.23 11:04, Nick Telford wrote:
> >>> Hi everyone,
> >>>
> >>> Quick update: I've added a new section to the KIP: "Offsets for
> Consumer
> >>> Rebalances", that outlines my solution to the problem that
> >>> StreamsPartitionAssignor needs to read StateStore offsets even if
> they're
> >>> not currently open.
> >>>
> >>> Regards,
> >>> Nick
> >>>
> >>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Bruno,
> >>>>
> >>>> Thanks for reviewing my proposal.
> >>>>
> >>>> 1.
> >>>> The main reason I added it was because it was easy to do. If we see no
> >>>> value in it, I can remove it.
> >>>>
> >>>> 2.
> >>>> Global StateStores can have multiple partitions in their input topics
> >>>> (which function as their changelogs), so they would have more than one
> >>>> partition.
> >>>>
> >>>> 3.
> >>>> That's a good point. At present, the only method it adds is
> >>>> isolationLevel(), which is likely not necessary outside of
> StateStores.
> >>>> It *does* provide slightly different guarantees in the documentation
> to
> >>>> several of the methods (hence the overrides). I'm not sure if this is
> >>>> enough to warrant a new interface though.
> >>>> I think the question that remains is whether this interface makes it
> >>>> easier to implement custom transactional StateStores than if we were
> to
> >>>> remove it? Probably not.
> >>>>
> >>>> 4.
> >>>> The main motivation for the Atomic Checkpointing is actually
> >> performance.
> >>>> My team has been testing out an implementation of this KIP without it,
> >> and
> >>>> we had problems with RocksDB doing *much* more compaction, due to the
> >>>> significantly increased flush rate. It was enough of a problem that
> (for
> >>>> the time being), we had to revert back to Kafka Streams proper.
> >>>> I think the best way to solve this, as you say, is to keep the
> >> .checkpoint
> >>>> files *in addition* to the offsets being stored within the store
> itself.
> >>>> Essentially, when closing StateStores, we force a memtable flush, then
> >>>> call getCommittedOffsets and write those out to the .checkpoint file.
> >>>> That would ensure the metadata is available to the
> >>>> StreamsPartitionAssignor for all closed stores.
> >>>> If there's a crash (no clean close), then we won't be able to
> guarantee
> >>>> which offsets were flushed to disk by RocksDB, so we'd need to open (
> >>>> init()), read offsets, and then close() those stores. But since this
> is
> >>>> the exception, and will only occur once (provided it doesn't crash
> every
> >>>> time!), I think the performance impact here would be acceptable.
> >>>>
> >>>> Thanks for the feedback, please let me know if you have any more
> >> comments
> >>>> or questions!
> >>>>
> >>>> I'm currently working on rebasing against trunk. This involves adding
> >>>> support for transactionality to VersionedStateStores. I will probably
> >> need
> >>>> to revise my implementation for transactional "segmented" stores, both
> >> to
> >>>> accommodate VersionedStateStore, and to clean up some other stuff.
> >>>>
> >>>> Regards,
> >>>> Nick
> >>>>
> >>>>
> >>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org>
> wrote:
> >>>>
> >>>>> Hi Nick,
> >>>>>
> >>>>> Thanks for the updates!
> >>>>>
> >>>>> I have a couple of questions/comments.
> >>>>>
> >>>>> 1.
> >>>>> Why do you propose a configuration that involves max. bytes and max.
> >>>>> reords? I think we are mainly concerned about memory consumption
> >> because
> >>>>> we want to limit the off-heap memory used. I cannot think of a case
> >>>>> where one would want to set the max. number of records.
> >>>>>
> >>>>>
> >>>>> 2.
> >>>>> Why does
> >>>>>
> >>>>>     default void commit(final Map<TopicPartition, Long>
> >> changelogOffsets) {
> >>>>>         flush();
> >>>>>     }
> >>>>>
> >>>>> take a map of partitions to changelog offsets?
> >>>>> The mapping between state stores to partitions is a 1:1 relationship.
> >>>>> Passing in a single changelog offset should suffice.
> >>>>>
> >>>>>
> >>>>> 3.
> >>>>> Why do we need the Transaction interface? It should be possible to
> hide
> >>>>> beginning and committing a transactions withing the state store
> >>>>> implementation, so that from outside the state store, it does not
> >> matter
> >>>>> whether the state store is transactional or not. What would be the
> >>>>> advantage of using the Transaction interface?
> >>>>>
> >>>>>
> >>>>> 4.
> >>>>> Regarding checkpointing offsets, I think we should keep the
> checkpoint
> >>>>> file in any case for the reason you mentioned about rebalancing. Even
> >> if
> >>>>> that would not be an issue, I would propose to move the change to
> >> offset
> >>>>> management to a new KIP and to not add more complexity than needed to
> >>>>> this one. I would not be too concerned about the consistency
> violation
> >>>>> you mention. As far as I understand, with transactional state stores
> >>>>> Streams would write the checkpoint file during every commit even
> under
> >>>>> EOS. In the failure case you describe, Streams would restore the
> state
> >>>>> stores from the offsets found in the checkpoint file written during
> the
> >>>>> penultimate commit instead of during the last commit. Basically,
> >> Streams
> >>>>> would overwrite the records written to the state store between the
> last
> >>>>> two commits with the same records read from the changelogs. While I
> >>>>> understand that this is wasteful, it is -- at the same time --
> >>>>> acceptable and most importantly it does not break EOS.
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>>
> >>>>> On 27.04.23 12:34, Nick Telford wrote:
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> I find myself (again) considering removing the offset management
> from
> >>>>>> StateStores, and keeping the old checkpoint file system. The reason
> is
> >>>>> that
> >>>>>> the StreamPartitionAssignor directly reads checkpoint files in order
> >> to
> >>>>>> determine which instance has the most up-to-date copy of the local
> >>>>> state.
> >>>>>> If we move offsets into the StateStore itself, then we will need to
> >>>>> open,
> >>>>>> initialize, read offsets and then close each StateStore (that is not
> >>>>>> already assigned and open) for which we have *any* local state, on
> >> every
> >>>>>> rebalance.
> >>>>>>
> >>>>>> Generally, I don't think there are many "orphan" stores like this
> >>>>> sitting
> >>>>>> around on most instances, but even a few would introduce additional
> >>>>> latency
> >>>>>> to an already somewhat lengthy rebalance procedure.
> >>>>>>
> >>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping things
> >> in
> >>>>> the
> >>>>>> checkpoint file(s) for now, and not worrying about the race. The
> >>>>> downside
> >>>>>> is that we wouldn't be able to remove the explicit RocksDB flush
> >>>>> on-commit,
> >>>>>> which likely hurts performance.
> >>>>>>
> >>>>>> If anyone has any thoughts or ideas on this subject, I would
> >> appreciate
> >>>>> it!
> >>>>>>
> >>>>>> Regards,
> >>>>>> Nick
> >>>>>>
> >>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Colt,
> >>>>>>>
> >>>>>>> The issue is that if there's a crash between 2 and 3, then you
> still
> >>>>> end
> >>>>>>> up with inconsistent data in RocksDB. The only way to guarantee
> that
> >>>>> your
> >>>>>>> checkpoint offsets and locally stored data are consistent with each
> >>>>> other
> >>>>>>> are to atomically commit them, which can be achieved by having the
> >>>>> offsets
> >>>>>>> stored in RocksDB.
> >>>>>>>
> >>>>>>> The offsets column family is likely to be extremely small (one
> >>>>>>> per-changelog partition + one per Topology input partition for
> >> regular
> >>>>>>> stores, one per input partition for global stores). So the overhead
> >>>>> will be
> >>>>>>> minimal.
> >>>>>>>
> >>>>>>> A major benefit of doing this is that we can remove the explicit
> >> calls
> >>>>> to
> >>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit. It
> >>>>> turns
> >>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka Streams
> >>>>>>> commits, *not* RocksDB configuration, which could be a major source
> >> of
> >>>>>>> confusion. Atomic checkpointing makes it safe to remove these
> >> explicit
> >>>>>>> flushes, because it no longer matters exactly when RocksDB flushes
> >>>>> data to
> >>>>>>> disk; since the data and corresponding checkpoint offsets will
> always
> >>>>> be
> >>>>>>> flushed together, the local store is always in a consistent state,
> >> and
> >>>>>>> on-restart, it can always safely resume restoration from the
> on-disk
> >>>>>>> offsets, restoring the small amount of data that hadn't been
> flushed
> >>>>> when
> >>>>>>> the app exited/crashed.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Nick
> >>>>>>>
> >>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Nick,
> >>>>>>>>
> >>>>>>>> Thanks for your reply. Ack to A) and B).
> >>>>>>>>
> >>>>>>>> For item C), I see what you're referring to. Your proposed
> solution
> >>>>> will
> >>>>>>>> work, so no need to change it. What I was suggesting was that it
> >>>>> might be
> >>>>>>>> possible to achieve this with only one column family. So long as:
> >>>>>>>>
> >>>>>>>>       - No uncommitted records (i.e. not committed to the
> changelog)
> >> are
> >>>>>>>>       *committed* to the state store, AND
> >>>>>>>>       - The Checkpoint offset (which refers to the changelog
> topic)
> >> is
> >>>>> less
> >>>>>>>>       than or equal to the last written changelog offset in
> rocksdb
> >>>>>>>>
> >>>>>>>> I don't see the need to do the full restoration from scratch. My
> >>>>>>>> understanding was that prior to 844/892, full restorations were
> >>>>> required
> >>>>>>>> because there could be uncommitted records written to RocksDB;
> >>>>> however,
> >>>>>>>> given your use of RocksDB transactions, that can be avoided with
> the
> >>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB
> >>>>> transaction, 3)
> >>>>>>>> update offset in checkpoint file.
> >>>>>>>>
> >>>>>>>> Anyways, your proposed solution works equivalently and I don't
> >> believe
> >>>>>>>> there is much overhead to an additional column family in RocksDB.
> >>>>> Perhaps
> >>>>>>>> it may even perform better than making separate writes to the
> >>>>> checkpoint
> >>>>>>>> file.
> >>>>>>>>
> >>>>>>>> Colt McNealy
> >>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <
> >> nick.telf...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Colt,
> >>>>>>>>>
> >>>>>>>>> A. I've done my best to de-couple the StateStore stuff from the
> >> rest
> >>>>> of
> >>>>>>>> the
> >>>>>>>>> Streams engine. The fact that there will be only one ongoing
> >> (write)
> >>>>>>>>> transaction at a time is not guaranteed by any API, and is just a
> >>>>>>>>> consequence of the way Streams operates. To that end, I tried to
> >>>>> ensure
> >>>>>>>> the
> >>>>>>>>> documentation and guarantees provided by the new APIs are
> >>>>> independent of
> >>>>>>>>> this incidental behaviour. In practice, you're right, this
> >>>>> essentially
> >>>>>>>>> refers to "interactive queries", which are technically "read
> >>>>>>>> transactions",
> >>>>>>>>> even if they don't actually use the transaction API to isolate
> >>>>>>>> themselves.
> >>>>>>>>>
> >>>>>>>>> B. Yes, although not ideal. This is for backwards compatibility,
> >>>>>>>> because:
> >>>>>>>>>        1) Existing custom StateStore implementations will
> implement
> >>>>>>>> flush(),
> >>>>>>>>> and not commit(), but the Streams engine now calls commit(), so
> >> those
> >>>>>>>> calls
> >>>>>>>>> need to be forwarded to flush() for these legacy stores.
> >>>>>>>>>        2) Existing StateStore *users*, i.e. outside of the
> Streams
> >>>>> engine
> >>>>>>>>> itself, may depend on explicitly calling flush(), so for these
> >> cases,
> >>>>>>>>> flush() needs to be redirected to call commit().
> >>>>>>>>> If anyone has a better way to guarantee compatibility without
> >>>>>>>> introducing
> >>>>>>>>> this potential recursion loop, I'm open to changes!
> >>>>>>>>>
> >>>>>>>>> C. This is described in the "Atomic Checkpointing" section.
> Offsets
> >>>>> are
> >>>>>>>>> stored in a separate RocksDB column family, which is guaranteed
> to
> >> be
> >>>>>>>>> atomically flushed to disk with all other column families. The
> >> issue
> >>>>> of
> >>>>>>>>> checkpoints being written to disk after commit causing
> >> inconsistency
> >>>>> if
> >>>>>>>> it
> >>>>>>>>> crashes in between is the reason why, under EOS, checkpoint files
> >> are
> >>>>>>>> only
> >>>>>>>>> written on clean shutdown. This is one of the major causes of
> "full
> >>>>>>>>> restorations", so moving the offsets into a place where they can
> be
> >>>>>>>>> guaranteed to be atomically written with the data they checkpoint
> >>>>>>>> allows us
> >>>>>>>>> to write the checkpoint offsets *on every commit*, not just on
> >> clean
> >>>>>>>>> shutdown.
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Nick
> >>>>>>>>>
> >>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Nick,
> >>>>>>>>>>
> >>>>>>>>>> Thank you for continuing this work. I have a few minor
> clarifying
> >>>>>>>>>> questions.
> >>>>>>>>>>
> >>>>>>>>>> A) "Records written to any transaction are visible to all other
> >>>>>>>>>> transactions immediately." I am confused here—I thought there
> >> could
> >>>>>>>> only
> >>>>>>>>> be
> >>>>>>>>>> one transaction going on at a time for a given state store given
> >> the
> >>>>>>>>>> threading model for processing records on a Task. Do you mean
> >>>>>>>> Interactive
> >>>>>>>>>> Queries by "other transactions"? (If so, then everything makes
> >>>>> sense—I
> >>>>>>>>>> thought that since IQ were read-only then they didn't count as
> >>>>>>>>>> transactions).
> >>>>>>>>>>
> >>>>>>>>>> B) Is it intentional that the default implementations of the
> >> flush()
> >>>>>>>> and
> >>>>>>>>>> commit() methods in the StateStore class refer to each other in
> >> some
> >>>>>>>> sort
> >>>>>>>>>> of unbounded recursion?
> >>>>>>>>>>
> >>>>>>>>>> C) How will the getCommittedOffset() method work? At first I
> >> thought
> >>>>>>>> the
> >>>>>>>>>> way to do it would be using a special key in the RocksDB store
> to
> >>>>>>>> store
> >>>>>>>>> the
> >>>>>>>>>> offset, and committing that with the transaction. But upon
> second
> >>>>>>>>> thought,
> >>>>>>>>>> since restoration from the changelog is an idempotent
> procedure, I
> >>>>>>>> think
> >>>>>>>>> it
> >>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then 2)
> >> write
> >>>>>>>> the
> >>>>>>>>>> offset to disk in a checkpoint file. If there is a crash between
> >> 1)
> >>>>>>>> and
> >>>>>>>>> 2),
> >>>>>>>>>> I think the only downside is now we replay a few more records
> (at
> >> a
> >>>>>>>> cost
> >>>>>>>>> of
> >>>>>>>>>> <100ms). Am I missing something there?
> >>>>>>>>>>
> >>>>>>>>>> Other than that, everything makes sense to me.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Colt McNealy
> >>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
> >>>>> nick.telf...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>
> >>>>>>>>>>> I've updated the KIP to reflect the latest version of the
> design:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>
> >>>>>>>>>>> There are several changes in there that reflect feedback from
> >> this
> >>>>>>>>>> thread,
> >>>>>>>>>>> and there's a new section and a bunch of interface changes
> >> relating
> >>>>>>>> to
> >>>>>>>>>>> Atomic Checkpointing, which is the final piece of the puzzle to
> >>>>>>>> making
> >>>>>>>>>>> everything robust.
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know what you think!
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Nick
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <
> >> nick.telf...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Lucas,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for looking over my KIP.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a typo in
> >> the
> >>>>>>>>> KIP
> >>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I
> >>>>>>>> changed it
> >>>>>>>>>> to
> >>>>>>>>>>>> per-instance for exactly the reason you highlighted.
> >>>>>>>>>>>> B) It's worth noting that transactionality is only enabled
> under
> >>>>>>>> EOS,
> >>>>>>>>>> and
> >>>>>>>>>>>> in the default mode of operation (ALOS), there should be no
> >>>>>>>> change in
> >>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the
> impact
> >> on
> >>>>>>>>>> users
> >>>>>>>>>>> by
> >>>>>>>>>>>> sufficiently low default values for the memory bound
> >>>>>>>> configuration. I
> >>>>>>>>>>>> understand your hesitation to include a significant change of
> >>>>>>>>>> behaviour,
> >>>>>>>>>>>> especially in a minor release, but I suspect that most users
> >> will
> >>>>>>>>>> prefer
> >>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of
> >>>>>>>> frequent
> >>>>>>>>>> state
> >>>>>>>>>>>> restorations! If this is a problem, the changes can wait until
> >> the
> >>>>>>>>> next
> >>>>>>>>>>>> major release. I'll be running a patched version of streams in
> >>>>>>>>>> production
> >>>>>>>>>>>> with these changes as soon as they're ready, so it won't
> disrupt
> >>>>>>>> me
> >>>>>>>>> :-D
> >>>>>>>>>>>> C) The main purpose of this sentence was just to note that
> some
> >>>>>>>>> changes
> >>>>>>>>>>>> will need to be made to the way Segments are handled in order
> to
> >>>>>>>>> ensure
> >>>>>>>>>>>> they also benefit from transactions. At the time I wrote it, I
> >>>>>>>> hadn't
> >>>>>>>>>>>> figured out the specific changes necessary, so it was
> >> deliberately
> >>>>>>>>>> vague.
> >>>>>>>>>>>> This is the one outstanding problem I'm currently working on,
> >> and
> >>>>>>>>> I'll
> >>>>>>>>>>>> update this section with more detail once I have figured out
> the
> >>>>>>>>> exact
> >>>>>>>>>>>> changes required.
> >>>>>>>>>>>> D) newTransaction() provides the necessary isolation
> guarantees.
> >>>>>>>>> While
> >>>>>>>>>>>> the RocksDB implementation of transactions doesn't technically
> >>>>>>>> *need*
> >>>>>>>>>>>> read-only users to call newTransaction(), other
> implementations
> >>>>>>>>> (e.g. a
> >>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling
> >>>>>>>> newTransaction()
> >>>>>>>>>> when
> >>>>>>>>>>>> no transaction is necessary is essentially free, as it will
> just
> >>>>>>>>> return
> >>>>>>>>>>>> this.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it
> >>>>>>>> should
> >>>>>>>>> be
> >>>>>>>>>>>> fairly obvious where the performance problems stem from:
> writes
> >>>>>>>> under
> >>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with the
> >>>>>>>>>>>> tombstone/record flag, 1 to decode it from the
> tombstone/record
> >>>>>>>> flag,
> >>>>>>>>>>> and 1
> >>>>>>>>>>>> to copy the record from the "temporary" store to the "main"
> >> store,
> >>>>>>>>> when
> >>>>>>>>>>> the
> >>>>>>>>>>>> transaction commits. The different approach taken by KIP-869
> >>>>>>>> should
> >>>>>>>>>>> perform
> >>>>>>>>>>>> much better, as it avoids all these copies, and may actually
> >>>>>>>> perform
> >>>>>>>>>>>> slightly better than trunk, due to batched writes in RocksDB
> >>>>>>>>> performing
> >>>>>>>>>>>> better than non-batched writes.[1]
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Nick
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1:
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>
> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
> >>>>>>>> lbruts...@confluent.io
> >>>>>>>>>>> .invalid>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I'm just starting to read up on the whole discussion about
> >>>>>>>> KIP-892
> >>>>>>>>> and
> >>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think
> >>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have
> some
> >>>>>>>>>>>>> questions about the latest draft.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>     A) If I understand correctly, you propose to put a bound
> on
> >> the
> >>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder if
> >> this
> >>>>>>>> is
> >>>>>>>>>>>>> sufficient if we have temporary imbalances in the cluster.
> For
> >>>>>>>>>>>>> example, depending on the timing of rebalances during a
> cluster
> >>>>>>>>>>>>> restart, it could happen that a single streams node is
> >> assigned a
> >>>>>>>>> lot
> >>>>>>>>>>>>> more tasks than expected. With your proposed change, this
> would
> >>>>>>>> mean
> >>>>>>>>>>>>> that the memory required by this one node could be a multiple
> >> of
> >>>>>>>>> what
> >>>>>>>>>>>>> is required during normal operation. I wonder if it wouldn't
> be
> >>>>>>>>> safer
> >>>>>>>>>>>>> to put a global bound on the memory use, across all tasks.
> >>>>>>>>>>>>>     B) Generally, the memory concerns still give me the
> feeling
> >>>>> that
> >>>>>>>>> this
> >>>>>>>>>>>>> should not be enabled by default for all users in a minor
> >>>>>>>> release.
> >>>>>>>>>>>>>     C) In section "Transaction Management": the sentence "A
> >> similar
> >>>>>>>>>>>>> analogue will be created to automatically manage `Segment`
> >>>>>>>>>>>>> transactions.". Maybe this is just me lacking some
> background,
> >>>>>>>> but I
> >>>>>>>>>>>>> do not understand this, it would be great if you could
> clarify
> >>>>>>>> what
> >>>>>>>>>>>>> you mean here.
> >>>>>>>>>>>>>     D) Could you please clarify why IQ has to call
> >>>>> newTransaction(),
> >>>>>>>>> when
> >>>>>>>>>>>>> it's read-only.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> And one last thing not strictly related to your KIP: if there
> >> is
> >>>>>>>> an
> >>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x
> slower
> >>>>>>>> (e.g.
> >>>>>>>>>>>>> by providing a flame graph), that would be quite interesting.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
> >>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which
> >>>>>>>> reflects
> >>>>>>>>> the
> >>>>>>>>>>>>>> implementation I've been working on:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This new design should address the outstanding points
> already
> >>>>>>>> made
> >>>>>>>>>> in
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> thread.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Please let me know if there are areas that are unclear or
> need
> >>>>>>>>> more
> >>>>>>>>>>>>>> clarification.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident that
> >>>>>>>> the
> >>>>>>>>>>>>> remaining
> >>>>>>>>>>>>>> work (making Segments behave) will not impact the documented
> >>>>>>>>> design.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <
> >> c...@littlehorse.io
> >>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping
> that,
> >>>>>>>>>> since
> >>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the
> >>>>>>>> documented
> >>>>>>>>>> API,
> >>>>>>>>>>>>> maybe
> >>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release to
> >>>>>>>> make
> >>>>>>>>>> that
> >>>>>>>>>>>>> change;
> >>>>>>>>>>>>>>> but given that it would be considered a major change, I
> like
> >>>>>>>>> your
> >>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>> the best.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
> >>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended to
> >>>>>>>> try
> >>>>>>>>> to
> >>>>>>>>>>>>> keep the
> >>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd
> >>>>>>>> have
> >>>>>>>>> to
> >>>>>>>>>>>>> wait for
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> major version release to land these changes.
> >>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of
> >>>>>>>> guarantee,
> >>>>>>>>>> and
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> typically longer commit intervals would be problematic
> when
> >>>>>>>>>>> reading
> >>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>> "committed" records.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor surgery,
> >>>>>>>> but I
> >>>>>>>>>>>>> spent a
> >>>>>>>>>>>>>>>> considerable amount of that time working through ideas for
> >>>>>>>>>>> possible
> >>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of keeping
> >>>>>>>> ALOS
> >>>>>>>>>>>>> as-is, but
> >>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards,
> >>>>>>>> although
> >>>>>>>>> I
> >>>>>>>>>>>>> have a
> >>>>>>>>>>>>>>>> solution that both expands on this, and provides for some
> >>>>>>>> more
> >>>>>>>>>>>>> formal
> >>>>>>>>>>>>>>>> guarantees.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for
> >>>>>>>>>> "Transactions",
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed"
> >>>>>>>> when
> >>>>>>>>>> under
> >>>>>>>>>>>>> EOS,
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us
> much
> >>>>>>>>> more
> >>>>>>>>>>>>> clearly
> >>>>>>>>>>>>>>>> defined isolation behaviour that can be properly
> >>>>>>>> documented to
> >>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>>> know what to expect.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and will
> >>>>>>>> update
> >>>>>>>>>> the
> >>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> I have something. The main struggle is trying to implement
> >>>>>>>>> this
> >>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>> making any major changes to the existing interfaces or
> >>>>>>>>> breaking
> >>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>> implementations, because currently everything expects to
> >>>>>>>>> operate
> >>>>>>>>>>>>> directly
> >>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. I
> >>>>>>>> think
> >>>>>>>>>> I'm
> >>>>>>>>>>>>> getting
> >>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much
> >>>>>>>> until
> >>>>>>>>>> next
> >>>>>>>>>>>>> week
> >>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>> to some work commitments.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
> >>>>>>>>> c...@littlehorse.io>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Nick,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the updated
> >>>>>>>>> KIP. I
> >>>>>>>>>>> am
> >>>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>> eager for this improvement to be released as it would
> >>>>>>>>> greatly
> >>>>>>>>>>>>> reduce
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Two questions:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 10)
> >>>>>>>>>>>>>>>>>> When reading records, we will use the
> >>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB
> >>>>>>>>>>>>>>>>>     and WriteBatchWithIndex#newIteratorWithBase
> utilities in
> >>>>>>>>>> order
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>> that uncommitted writes are available to query.
> >>>>>>>>>>>>>>>>> Why do extra work to enable the reading of uncommitted
> >>>>>>>>> writes
> >>>>>>>>>>>>> during
> >>>>>>>>>>>>>>> IQ?
> >>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, in
> >>>>>>>> my
> >>>>>>>>>>>>> opinion, a
> >>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the
> >>>>>>>>>>> guarantee
> >>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order to
> >>>>>>>> avoid
> >>>>>>>>>>> dirty
> >>>>>>>>>>>>>>> reads,
> >>>>>>>>>>>>>>>>> one currently must query a standby replica (but this
> >>>>>>>> still
> >>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>> fully
> >>>>>>>>>>>>>>>>> guarantee monotonic reads).
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization on
> >>>>>>>> ALOS
> >>>>>>>>>>>>> stores?
> >>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to
> >>>>>>>>> restore
> >>>>>>>>>>>>> state
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was
> >>>>>>>>> acceptable
> >>>>>>>>>> to
> >>>>>>>>>>>>> accept
> >>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk. On a
> >>>>>>>>> side
> >>>>>>>>>>>>> note, if
> >>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the
> >>>>>>>> community
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads;
> >>>>>>>> otherwise
> >>>>>>>>>>> users
> >>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thank you for doing this fantastic work!
> >>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
> >>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no
> >>>>>>>>> longer
> >>>>>>>>>>>>> include
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be
> >>>>>>>> added
> >>>>>>>>>> as a
> >>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>> to further optimize the consistency and performance of
> >>>>>>>>> state
> >>>>>>>>>>>>> stores.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I've also added a section discussing some of the
> >>>>>>>> concerns
> >>>>>>>>>>> around
> >>>>>>>>>>>>>>>>>> concurrency, especially in the presence of Iterators.
> >>>>>>>> I'm
> >>>>>>>>>>>>> thinking of
> >>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting
> >>>>>>>>>>>>> copy-on-write
> >>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an
> >>>>>>>>> active
> >>>>>>>>>>>>>>> iterator),
> >>>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>> I'm open to suggestions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford <
> >>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi Colt,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844
> >>>>>>>> implementation:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>       - Writes uncommitted records to a temporary
> >>>>>>>> RocksDB
> >>>>>>>>>>>>> instance
> >>>>>>>>>>>>>>>>>>>          - Since tombstones need to be flagged, all
> >>>>>>>> record
> >>>>>>>>>>>>> values are
> >>>>>>>>>>>>>>>>>>>          prefixed with a value/tombstone marker. This
> >>>>>>>>>>>>> necessitates a
> >>>>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> copy.
> >>>>>>>>>>>>>>>>>>>       - On-commit, iterates all records in this
> >>>>>>>> temporary
> >>>>>>>>>>>>> instance and
> >>>>>>>>>>>>>>>>>>>       writes them to the main RocksDB store.
> >>>>>>>>>>>>>>>>>>>       - While iterating, the value/tombstone marker
> >>>>>>>> needs
> >>>>>>>>> to
> >>>>>>>>>> be
> >>>>>>>>>>>>> parsed
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>       the real value extracted. This necessitates
> >>>>>>>> another
> >>>>>>>>>>> memory
> >>>>>>>>>>>>> copy.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> My guess is that the cost of iterating the temporary
> >>>>>>>>>> RocksDB
> >>>>>>>>>>>>> store
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> major factor, with the 2 extra memory copies
> >>>>>>>> per-Record
> >>>>>>>>>>>>>>> contributing
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> significant amount too.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
> >>>>>>>>>>>>> c...@littlehorse.io>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Out of curiosity, why does the performance of the
> >>>>>>>> store
> >>>>>>>>>>>>> degrade so
> >>>>>>>>>>>>>>>>>>>> significantly with the 844 implementation? I
> >>>>>>>> wouldn't
> >>>>>>>>> be
> >>>>>>>>>>> too
> >>>>>>>>>>>>>>>> surprised
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> 50-60% drop (caused by each record being written
> >>>>>>>>> twice),
> >>>>>>>>>>> but
> >>>>>>>>>>>>> 96%
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> extreme.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The only thing I can think of which could create
> >>>>>>>> such a
> >>>>>>>>>>>>> bottleneck
> >>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and
> >>>>>>>>> then
> >>>>>>>>>>>>>>>>> re-serializes
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> store values when copying from the uncommitted to
> >>>>>>>>>> committed
> >>>>>>>>>>>>> store,
> >>>>>>>>>>>>>>>>> but I
> >>>>>>>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the
> >>>>>>>> PR.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Colt McNealy
> >>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io*
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
> >>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points
> >>>>>>>> that
> >>>>>>>>>> have
> >>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>> raised
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit
> >>>>>>>>>> interval
> >>>>>>>>>>>>> of 5
> >>>>>>>>>>>>>>>>> minutes
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow
> >>>>>>>>> too
> >>>>>>>>>>>>> large.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> There's a couple of different things I can think
> >>>>>>>> of
> >>>>>>>>> to
> >>>>>>>>>>>>> solve
> >>>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>       - We already have a memory/record limit in the
> >>>>>>>> KIP
> >>>>>>>>>> to
> >>>>>>>>>>>>> prevent
> >>>>>>>>>>>>>>>> OOM
> >>>>>>>>>>>>>>>>>>>>>       errors. Should we choose a default value for
> >>>>>>>>> these?
> >>>>>>>>>> My
> >>>>>>>>>>>>>>> concern
> >>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>       anything we choose might seem rather
> >>>>>>>> arbitrary. We
> >>>>>>>>>>> could
> >>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>       its behaviour such that under ALOS, it only
> >>>>>>>>> triggers
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>       StateStore, but under EOS, it triggers a
> >>>>>>>> commit of
> >>>>>>>>>> the
> >>>>>>>>>>>>> Kafka
> >>>>>>>>>>>>>>>>>>>>> transaction.
> >>>>>>>>>>>>>>>>>>>>>       - We could introduce a separate `
> >>>>>>>>>>> checkpoint.interval.ms`
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>>>>>>       to commit the StateStores more frequently than
> >>>>>>>> the
> >>>>>>>>>>>>> general
> >>>>>>>>>>>>>>>>>>>>>       commit.interval.ms? My concern here is that
> >>>>>>>> the
> >>>>>>>>>>>>> semantics of
> >>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>>>>>       would depend on the processing.mode; under
> >>>>>>>> ALOS it
> >>>>>>>>>>> would
> >>>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>>>>>>>       frequently committing stores, whereas under
> >>>>>>>> EOS it
> >>>>>>>>>>>>> couldn't.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Any better ideas?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 16:25, Nick Telford <
> >>>>>>>>>>>>>>>> nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Alex,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I've updated the discussion of OOM issues by
> >>>>>>>>>> describing
> >>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>> we'll
> >>>>>>>>>>>>>>>>>>>> handle
> >>>>>>>>>>>>>>>>>>>>>> it. Here's the new text:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> To mitigate this, we will automatically force a
> >>>>>>>>> Task
> >>>>>>>>>>>>> commit if
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> total
> >>>>>>>>>>>>>>>>>>>>>>> uncommitted records returned by
> >>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedEntries()
> >>>>>>>>>>> exceeds a
> >>>>>>>>>>>>>>>>> threshold,
> >>>>>>>>>>>>>>>>>>>>>>> configured by
> >>>>>>>>>> max.uncommitted.state.entries.per.task;
> >>>>>>>>>>>>> or the
> >>>>>>>>>>>>>>>>> total
> >>>>>>>>>>>>>>>>>>>>>>> memory used for buffering uncommitted records
> >>>>>>>>>> returned
> >>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes()
> >>>>>>>>> exceeds
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> threshold
> >>>>>>>>>>>>>>>>>>>>>>> configured by
> >>>>>>>>> max.uncommitted.state.bytes.per.task.
> >>>>>>>>>>>>> This will
> >>>>>>>>>>>>>>>>>> roughly
> >>>>>>>>>>>>>>>>>>>>>>> bound the memory required per-Task for
> >>>>>>>> buffering
> >>>>>>>>>>>>> uncommitted
> >>>>>>>>>>>>>>>>>> records,
> >>>>>>>>>>>>>>>>>>>>>>> irrespective of the commit.interval.ms, and
> >>>>>>>> will
> >>>>>>>>>>>>> effectively
> >>>>>>>>>>>>>>>>> bound
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> number of records that will need to be
> >>>>>>>> restored in
> >>>>>>>>>> the
> >>>>>>>>>>>>> event
> >>>>>>>>>>>>>>>> of a
> >>>>>>>>>>>>>>>>>>>>> failure.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> These limits will be checked in
> >>>>>>>> StreamTask#process
> >>>>>>>>>> and
> >>>>>>>>>>> a
> >>>>>>>>>>>>>>>> premature
> >>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>> will be requested via Task#requestCommit().
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Note that these new methods provide default
> >>>>>>>>>>>>> implementations
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> ensure
> >>>>>>>>>>>>>>>>>>>>>>> existing custom stores and non-transactional
> >>>>>>>>> stores
> >>>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>> InMemoryKeyValueStore) do not force any early
> >>>>>>>>>> commits.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I've chosen to have the StateStore expose
> >>>>>>>>>>> approximations
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>> buffer
> >>>>>>>>>>>>>>>>>>>>>> size/count instead of opaquely requesting a
> >>>>>>>> commit
> >>>>>>>>> in
> >>>>>>>>>>>>> order to
> >>>>>>>>>>>>>>>>>>>> delegate
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> decision making to the Task itself. This enables
> >>>>>>>>>> Tasks
> >>>>>>>>>>>>> to look
> >>>>>>>>>>>>>>>> at
> >>>>>>>>>>>>>>>>>>>> *all*
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>> their StateStores, and determine whether an
> >>>>>>>> early
> >>>>>>>>>>> commit
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> necessary.
> >>>>>>>>>>>>>>>>>>>>>> Notably, it enables pre-Task thresholds,
> >>>>>>>> instead of
> >>>>>>>>>>>>> per-Store,
> >>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>> prevents Tasks with many StateStores from using
> >>>>>>>>> much
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>> memory
> >>>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>> Tasks
> >>>>>>>>>>>>>>>>>>>>>> with one StateStore. This makes sense, since
> >>>>>>>>> commits
> >>>>>>>>>>> are
> >>>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>> by-Task,
> >>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>> by-Store.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Prizes* for anyone who can come up with a better
> >>>>>>>>> name
> >>>>>>>>>>>>> for the
> >>>>>>>>>>>>>>>> new
> >>>>>>>>>>>>>>>>>>>> config
> >>>>>>>>>>>>>>>>>>>>>> properties!
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks for pointing out the potential
> >>>>>>>> performance
> >>>>>>>>>>> issues
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> WBWI.
> >>>>>>>>>>>>>>>>>> From
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> benchmarks that user posted[1], it looks like
> >>>>>>>> WBWI
> >>>>>>>>>>> still
> >>>>>>>>>>>>>>>> performs
> >>>>>>>>>>>>>>>>>>>>>> considerably better than individual puts, which
> >>>>>>>> is
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>> design,
> >>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>> I'd actually expect a performance boost from
> >>>>>>>> WBWI,
> >>>>>>>>>> just
> >>>>>>>>>>>>> not as
> >>>>>>>>>>>>>>>>> great
> >>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>> we'd get from a plain WriteBatch. This does
> >>>>>>>> suggest
> >>>>>>>>>>> that
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>>> optimization would be to use a regular
> >>>>>>>> WriteBatch
> >>>>>>>>> for
> >>>>>>>>>>>>>>>> restoration
> >>>>>>>>>>>>>>>>>> (in
> >>>>>>>>>>>>>>>>>>>>>> RocksDBStore#restoreBatch), since we know that
> >>>>>>>>> those
> >>>>>>>>>>>>> records
> >>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> never
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> queried before they're committed.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>
> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> * Just kidding, no prizes, sadly.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 12:28, Alexander
> >>>>>>>> Sorokoumov
> >>>>>>>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hey Nick,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP! With such a significant
> >>>>>>>>>>>>> performance
> >>>>>>>>>>>>>>>>>>>> degradation
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> the secondary store approach, we should
> >>>>>>>> definitely
> >>>>>>>>>>>>> consider
> >>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex. I also like encapsulating
> >>>>>>>>>>>>> checkpointing
> >>>>>>>>>>>>>>>>> inside
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> default state store implementation to improve
> >>>>>>>>>>>>> performance.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> +1 to John's comment to keep the current
> >>>>>>>>>> checkpointing
> >>>>>>>>>>>>> as a
> >>>>>>>>>>>>>>>>>> fallback
> >>>>>>>>>>>>>>>>>>>>>>> mechanism. We want to keep existing users'
> >>>>>>>>> workflows
> >>>>>>>>>>>>> intact
> >>>>>>>>>>>>>>> if
> >>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> can. A
> >>>>>>>>>>>>>>>>>>>>>>> non-intrusive way would be to add a separate
> >>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>>>> say,
> >>>>>>>>>>>>>>>>>>>>>>> StateStore#managesCheckpointing(), that
> >>>>>>>> controls
> >>>>>>>>>>>>> whether the
> >>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>> implementation owns checkpointing.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I think that a solution to the transactional
> >>>>>>>>> writes
> >>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>> address
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> OOMEs. One possible way to address that is to
> >>>>>>>> wire
> >>>>>>>>>>>>>>> StateStore's
> >>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>> request by adding, say, StateStore#commitNeeded
> >>>>>>>>> that
> >>>>>>>>>>> is
> >>>>>>>>>>>>>>> checked
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> StreamTask#commitNeeded via the corresponding
> >>>>>>>>>>>>>>>>>> ProcessorStateManager.
> >>>>>>>>>>>>>>>>>>>>> With
> >>>>>>>>>>>>>>>>>>>>>>> that change, RocksDBStore will have to track
> >>>>>>>> the
> >>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>>> size
> >>>>>>>>>>>>>>>>>>>>>>> and request a commit when the size goes over a
> >>>>>>>>>>>>> (configurable)
> >>>>>>>>>>>>>>>>>>>> threshold.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> AFAIU WriteBatchWithIndex might perform
> >>>>>>>>>> significantly
> >>>>>>>>>>>>> slower
> >>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>>>> non-txn
> >>>>>>>>>>>>>>>>>>>>>>> puts as the batch size grows [1]. We should
> >>>>>>>> have a
> >>>>>>>>>>>>>>>> configuration
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> fall
> >>>>>>>>>>>>>>>>>>>>>>> back to the current behavior (and/or disable
> >>>>>>>> txn
> >>>>>>>>>>> stores
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> ALOS)
> >>>>>>>>>>>>>>>>>>>> unless
> >>>>>>>>>>>>>>>>>>>>>>> the benchmarks show negligible overhead for
> >>>>>>>> longer
> >>>>>>>>>>>>> commits /
> >>>>>>>>>>>>>>>>>>>>> large-enough
> >>>>>>>>>>>>>>>>>>>>>>> batch sizes.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> If you prefer to keep the KIP smaller, I would
> >>>>>>>>>> rather
> >>>>>>>>>>>>> cut out
> >>>>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing rather than
> >>>>>>>>> proper
> >>>>>>>>>>>>> OOMe
> >>>>>>>>>>>>>>>>> handling
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>> being able to switch to non-txn behavior. The
> >>>>>>>>>>>>> checkpointing
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> necessary to solve the recovery-under-EOS
> >>>>>>>> problem.
> >>>>>>>>>> On
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> other
> >>>>>>>>>>>>>>>>>> hand,
> >>>>>>>>>>>>>>>>>>>>> once
> >>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex is in, it will be much
> >>>>>>>> easier
> >>>>>>>>> to
> >>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> If you share the current implementation, I am
> >>>>>>>>> happy
> >>>>>>>>>> to
> >>>>>>>>>>>>> help
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> address
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> OOMe and configuration parts as well as review
> >>>>>>>> and
> >>>>>>>>>>> test
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> patch.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>> Alex
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>> https://github.com/facebook/rocksdb/issues/608
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
> >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi John,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the review and feedback!
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 1. Custom Stores: I've been mulling over this
> >>>>>>>>>>> problem
> >>>>>>>>>>>>>>> myself.
> >>>>>>>>>>>>>>>>> As
> >>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>> stands,
> >>>>>>>>>>>>>>>>>>>>>>>> custom stores would essentially lose
> >>>>>>>>> checkpointing
> >>>>>>>>>>>>> with no
> >>>>>>>>>>>>>>>>>>>> indication
> >>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> they're expected to make changes, besides a
> >>>>>>>> line
> >>>>>>>>>> in
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> release
> >>>>>>>>>>>>>>>>>>>>> notes. I
> >>>>>>>>>>>>>>>>>>>>>>>> agree that the best solution would be to
> >>>>>>>>> provide a
> >>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> checkpoints
> >>>>>>>>>>>>>>>>>>>>>>>> to a file. The one thing I would change is
> >>>>>>>> that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>> checkpointing
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> store-local file, instead of a per-Task file.
> >>>>>>>>> This
> >>>>>>>>>>>>> way the
> >>>>>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>>>>>> technically owns its own checkpointing (via a
> >>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>>>> implementation),
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> the StateManager/Task execution engine
> >>>>>>>> doesn't
> >>>>>>>>>> need
> >>>>>>>>>>>>> to know
> >>>>>>>>>>>>>>>>>>>> anything
> >>>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>> checkpointing, which greatly simplifies some
> >>>>>>>> of
> >>>>>>>>>> the
> >>>>>>>>>>>>> logic.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 2. OOME errors: The main reasons why I didn't
> >>>>>>>>>>> explore
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> this is
> >>>>>>>>>>>>>>>>>>>>>>>> a) to keep this KIP as simple as possible,
> >>>>>>>> and
> >>>>>>>>> b)
> >>>>>>>>>>>>> because
> >>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> exactly
> >>>>>>>>>>>>>>>>>>>>>>>> how to signal that a Task should commit
> >>>>>>>>>> prematurely.
> >>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>> confident
> >>>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>>> possible, and I think it's worth adding a
> >>>>>>>>> section
> >>>>>>>>>> on
> >>>>>>>>>>>>>>> handling
> >>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>>>> Besides
> >>>>>>>>>>>>>>>>>>>>>>>> my proposal to force an early commit once
> >>>>>>>> memory
> >>>>>>>>>>> usage
> >>>>>>>>>>>>>>>> reaches
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>> threshold,
> >>>>>>>>>>>>>>>>>>>>>>>> is there any other approach that you might
> >>>>>>>>> suggest
> >>>>>>>>>>> for
> >>>>>>>>>>>>>>>> tackling
> >>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> problem?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 3. ALOS: I can add in an explicit paragraph,
> >>>>>>>> but
> >>>>>>>>>> my
> >>>>>>>>>>>>>>>> assumption
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>> since transactional behaviour comes at
> >>>>>>>> little/no
> >>>>>>>>>>>>> cost, that
> >>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> available by default on all stores,
> >>>>>>>> irrespective
> >>>>>>>>>> of
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>>>>> mode.
> >>>>>>>>>>>>>>>>>>>>>>>> While ALOS doesn't use transactions, the Task
> >>>>>>>>>> itself
> >>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>> "commits",
> >>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>>> the behaviour should be correct under ALOS
> >>>>>>>> too.
> >>>>>>>>>> I'm
> >>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>> convinced
> >>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>>> worth having both
> >>>>>>>>> transactional/non-transactional
> >>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>>>> available, as
> >>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>> would considerably increase the complexity of
> >>>>>>>>> the
> >>>>>>>>>>>>> codebase,
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>> little
> >>>>>>>>>>>>>>>>>>>>>>>> benefit.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> 4. Method deprecation: Are you referring to
> >>>>>>>>>>>>>>>>>>>> StateStore#getPosition()?
> >>>>>>>>>>>>>>>>>>>>>>> As I
> >>>>>>>>>>>>>>>>>>>>>>>> understand it, Position contains the
> >>>>>>>> position of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> *source*
> >>>>>>>>>>>>>>>>>>>> topics,
> >>>>>>>>>>>>>>>>>>>>>>>> whereas the commit offsets would be the
> >>>>>>>>>> *changelog*
> >>>>>>>>>>>>>>> offsets.
> >>>>>>>>>>>>>>>> So
> >>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>>>>>> necessary to retain the Position data, as
> >>>>>>>> well
> >>>>>>>>> as
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>>>>> offsets.
> >>>>>>>>>>>>>>>>>>>>>>>> What I meant in the KIP is that Position
> >>>>>>>> offsets
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>> currently
> >>>>>>>>>>>>>>>>>>>> stored
> >>>>>>>>>>>>>>>>>>>>>>> in a
> >>>>>>>>>>>>>>>>>>>>>>>> file, and since we can atomically store
> >>>>>>>> metadata
> >>>>>>>>>>>>> along with
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> record
> >>>>>>>>>>>>>>>>>>>>>>>> batch we commit to RocksDB, we can move our
> >>>>>>>>>> Position
> >>>>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> metadata too, and gain the same transactional
> >>>>>>>>>>>>> guarantees
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> changelog offsets, ensuring that the Position
> >>>>>>>>>>> offsets
> >>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> consistent
> >>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> the records that are read from the database.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, 22 Nov 2022 at 16:25, John Roesler <
> >>>>>>>>>>>>>>>>> vvcep...@apache.org>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for publishing this alternative,
> >>>>>>>> Nick!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> The benchmark you mentioned in the KIP-844
> >>>>>>>>>>>>> discussion
> >>>>>>>>>>>>>>> seems
> >>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>>> compelling reason to revisit the built-in
> >>>>>>>>>>>>>>> transactionality
> >>>>>>>>>>>>>>>>>>>>> mechanism.
> >>>>>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>>> also appreciate you analysis, showing that
> >>>>>>>> for
> >>>>>>>>>>> most
> >>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>> cases,
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> write
> >>>>>>>>>>>>>>>>>>>>>>>>> batch approach should be just fine.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> There are a couple of points that would
> >>>>>>>> hold
> >>>>>>>>> me
> >>>>>>>>>>>>> back from
> >>>>>>>>>>>>>>>>>>>> approving
> >>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>> KIP right now:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 1. Loss of coverage for custom stores.
> >>>>>>>>>>>>>>>>>>>>>>>>> The fact that you can plug in a
> >>>>>>>> (relatively)
> >>>>>>>>>>> simple
> >>>>>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> XStateStore interfaces and automagically
> >>>>>>>> get a
> >>>>>>>>>>>>>>> distributed
> >>>>>>>>>>>>>>>>>>>> database
> >>>>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> it is a significant benefit of Kafka
> >>>>>>>> Streams.
> >>>>>>>>>> I'd
> >>>>>>>>>>>>> hate to
> >>>>>>>>>>>>>>>>> lose
> >>>>>>>>>>>>>>>>>>>> it,
> >>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>> would be better to spend some time and
> >>>>>>>> come up
> >>>>>>>>>>> with
> >>>>>>>>>>>>> a way
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> preserve
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> property. For example, can we provide a
> >>>>>>>>> default
> >>>>>>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> `commit(..)` that re-implements the
> >>>>>>>> existing
> >>>>>>>>>>>>>>>> checkpoint-file
> >>>>>>>>>>>>>>>>>>>>>>> approach? Or
> >>>>>>>>>>>>>>>>>>>>>>>>> perhaps add an `isTransactional()` flag to
> >>>>>>>> the
> >>>>>>>>>>> state
> >>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>> interface
> >>>>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>>>> that the runtime can decide whether to
> >>>>>>>>> continue
> >>>>>>>>>> to
> >>>>>>>>>>>>> manage
> >>>>>>>>>>>>>>>>>>>> checkpoint
> >>>>>>>>>>>>>>>>>>>>>>>> files
> >>>>>>>>>>>>>>>>>>>>>>>>> vs delegating transactionality to the
> >>>>>>>> stores?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 2. Guarding against OOME
> >>>>>>>>>>>>>>>>>>>>>>>>> I appreciate your analysis, but I don't
> >>>>>>>> think
> >>>>>>>>>> it's
> >>>>>>>>>>>>>>>> sufficient
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> say
> >>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>> we will solve the memory problem later if
> >>>>>>>> it
> >>>>>>>>>>> becomes
> >>>>>>>>>>>>>>>>> necessary.
> >>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>>>>> experience leading to that situation would
> >>>>>>>> be
> >>>>>>>>>>> quite
> >>>>>>>>>>>>> bad:
> >>>>>>>>>>>>>>>>>> Imagine,
> >>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>> upgrade to AK 3.next, your tests pass, so
> >>>>>>>> you
> >>>>>>>>>>>>> deploy to
> >>>>>>>>>>>>>>>>>>>> production.
> >>>>>>>>>>>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>>>>>>>>>> night, you get paged because your app is
> >>>>>>>> now
> >>>>>>>>>>>>> crashing
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> OOMEs. As
> >>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>> all OOMEs, you'll have a really hard time
> >>>>>>>>>> finding
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> root
> >>>>>>>>>>>>>>>>>> cause,
> >>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> once
> >>>>>>>>>>>>>>>>>>>>>>>>> you do, you won't have a clear path to
> >>>>>>>> resolve
> >>>>>>>>>> the
> >>>>>>>>>>>>> issue.
> >>>>>>>>>>>>>>>> You
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>>>>> tune down the commit interval and cache
> >>>>>>>> buffer
> >>>>>>>>>>> size
> >>>>>>>>>>>>> until
> >>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> stop
> >>>>>>>>>>>>>>>>>>>>>>>> getting
> >>>>>>>>>>>>>>>>>>>>>>>>> crashes.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> FYI, I know of multiple cases where people
> >>>>>>>> run
> >>>>>>>>>> EOS
> >>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>> larger
> >>>>>>>>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>>>>>>>>> intervals to get better batching than the
> >>>>>>>>>> default,
> >>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>> pathological case would be as rare as you
> >>>>>>>>>> suspect.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Given that we already have the rudiments
> >>>>>>>> of an
> >>>>>>>>>>> idea
> >>>>>>>>>>>>> of
> >>>>>>>>>>>>>>> what
> >>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>> prevent this downside, we should take the
> >>>>>>>> time
> >>>>>>>>>> to
> >>>>>>>>>>>>> design
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> solution.
> >>>>>>>>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>>>>>> owe
> >>>>>>>>>>>>>>>>>>>>>>>>> it to our users to ensure that awesome new
> >>>>>>>>>>> features
> >>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>> come
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>> bitter
> >>>>>>>>>>>>>>>>>>>>>>>>> pills unless we can't avoid it.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 3. ALOS mode.
> >>>>>>>>>>>>>>>>>>>>>>>>> On the other hand, I didn't see an
> >>>>>>>> indication
> >>>>>>>>> of
> >>>>>>>>>>> how
> >>>>>>>>>>>>>>> stores
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> handled under ALOS (aka non-EOS) mode.
> >>>>>>>>>>>>> Theoretically, the
> >>>>>>>>>>>>>>>>>>>>>>>> transactionality
> >>>>>>>>>>>>>>>>>>>>>>>>> of the store and the processing mode are
> >>>>>>>>>>>>> orthogonal. A
> >>>>>>>>>>>>>>>>>>>> transactional
> >>>>>>>>>>>>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>>>>>>>>>>>> would serve ALOS just as well as a
> >>>>>>>>>>>>> non-transactional one
> >>>>>>>>>>>>>>>> (if
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>> better).
> >>>>>>>>>>>>>>>>>>>>>>>>> Under ALOS, though, the default commit
> >>>>>>>>> interval
> >>>>>>>>>> is
> >>>>>>>>>>>>> five
> >>>>>>>>>>>>>>>>>> minutes,
> >>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> memory issue is far more pressing.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> As I see it, we have several options to
> >>>>>>>>> resolve
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>> point.
> >>>>>>>>>>>>>>>>> We
> >>>>>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>>>>>>>>>> demonstrate that transactional stores work
> >>>>>>>>> just
> >>>>>>>>>>>>> fine for
> >>>>>>>>>>>>>>>> ALOS
> >>>>>>>>>>>>>>>>>>>> and we
> >>>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>> therefore just swap over unconditionally.
> >>>>>>>> We
> >>>>>>>>>> could
> >>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>> disable
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> transactional mechanism under ALOS so that
> >>>>>>>>>> stores
> >>>>>>>>>>>>> operate
> >>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>>>> they do today when run in ALOS mode.
> >>>>>>>> Finally,
> >>>>>>>>> we
> >>>>>>>>>>>>> could do
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-844 and make transactional stores
> >>>>>>>> opt-in
> >>>>>>>>>> (it'd
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> better
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>> avoid
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> extra opt-in mechanism, but it's a good
> >>>>>>>>>>>>>>>> get-out-of-jail-free
> >>>>>>>>>>>>>>>>>>>> card).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 4. (minor point) Deprecation of methods
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> You mentioned that the new `commit` method
> >>>>>>>>>>> replaces
> >>>>>>>>>>>>>>> flush,
> >>>>>>>>>>>>>>>>>>>>>>>>> updateChangelogOffsets, and checkpoint. It
> >>>>>>>>> seems
> >>>>>>>>>>> to
> >>>>>>>>>>>>> me
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> point
> >>>>>>>>>>>>>>>>>>>>>>>> about
> >>>>>>>>>>>>>>>>>>>>>>>>> atomicity and Position also suggests that
> >>>>>>>> it
> >>>>>>>>>>>>> replaces the
> >>>>>>>>>>>>>>>>>>>> Position
> >>>>>>>>>>>>>>>>>>>>>>>>> callbacks. However, the proposal only
> >>>>>>>>> deprecates
> >>>>>>>>>>>>> `flush`.
> >>>>>>>>>>>>>>>>>> Should
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>>> deprecating other methods as well?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for the KIP! It's really nice
> >>>>>>>>> that
> >>>>>>>>>>> you
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> Alex
> >>>>>>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> chance to collaborate on both directions so
> >>>>>>>>> that
> >>>>>>>>>>> we
> >>>>>>>>>>>>> can
> >>>>>>>>>>>>>>> get
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> best
> >>>>>>>>>>>>>>>>>>>>>>>>> outcome for Streams and its users.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> -John
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 2022/11/21 15:02:15 Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> As I mentioned in the discussion thread
> >>>>>>>> for
> >>>>>>>>>>>>> KIP-844,
> >>>>>>>>>>>>>>> I've
> >>>>>>>>>>>>>>>>>> been
> >>>>>>>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>>>> an alternative approach to achieving
> >>>>>>>> better
> >>>>>>>>>>>>>>> transactional
> >>>>>>>>>>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> Kafka Streams StateStores.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I've published this separately as
> >>>>>>>> KIP-892:
> >>>>>>>>>>>>>>> Transactional
> >>>>>>>>>>>>>>>>>>>> Semantics
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> StateStores
> >>>>>>>>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >>>>>>>>>>>>>>>>>>>>>>>>>> ,
> >>>>>>>>>>>>>>>>>>>>>>>>>> so that it can be discussed/reviewed
> >>>>>>>>>> separately
> >>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>> KIP-844.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Alex: I'm especially interested in what
> >>>>>>>> you
> >>>>>>>>>>> think!
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> I have a nearly complete implementation
> >>>>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>>>>>>> outlined in
> >>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>> KIP, please let me know if you'd like me
> >>>>>>>> to
> >>>>>>>>>> push
> >>>>>>>>>>>>> them
> >>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>> review
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>> advance
> >>>>>>>>>>>>>>>>>>>>>>>>>> of a vote.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to