Hi everyone,

Quick update: I've added a new section to the KIP: "Offsets for Consumer
Rebalances", that outlines my solution to the problem that
StreamsPartitionAssignor needs to read StateStore offsets even if they're
not currently open.

Regards,
Nick

On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Bruno,
>
> Thanks for reviewing my proposal.
>
> 1.
> The main reason I added it was because it was easy to do. If we see no
> value in it, I can remove it.
>
> 2.
> Global StateStores can have multiple partitions in their input topics
> (which function as their changelogs), so they would have more than one
> partition.
>
> 3.
> That's a good point. At present, the only method it adds is
> isolationLevel(), which is likely not necessary outside of StateStores.
> It *does* provide slightly different guarantees in the documentation to
> several of the methods (hence the overrides). I'm not sure if this is
> enough to warrant a new interface though.
> I think the question that remains is whether this interface makes it
> easier to implement custom transactional StateStores than if we were to
> remove it? Probably not.
>
> 4.
> The main motivation for the Atomic Checkpointing is actually performance.
> My team has been testing out an implementation of this KIP without it, and
> we had problems with RocksDB doing *much* more compaction, due to the
> significantly increased flush rate. It was enough of a problem that (for
> the time being), we had to revert back to Kafka Streams proper.
> I think the best way to solve this, as you say, is to keep the .checkpoint
> files *in addition* to the offsets being stored within the store itself.
> Essentially, when closing StateStores, we force a memtable flush, then
> call getCommittedOffsets and write those out to the .checkpoint file.
> That would ensure the metadata is available to the
> StreamsPartitionAssignor for all closed stores.
> If there's a crash (no clean close), then we won't be able to guarantee
> which offsets were flushed to disk by RocksDB, so we'd need to open (
> init()), read offsets, and then close() those stores. But since this is
> the exception, and will only occur once (provided it doesn't crash every
> time!), I think the performance impact here would be acceptable.
>
> Thanks for the feedback, please let me know if you have any more comments
> or questions!
>
> I'm currently working on rebasing against trunk. This involves adding
> support for transactionality to VersionedStateStores. I will probably need
> to revise my implementation for transactional "segmented" stores, both to
> accommodate VersionedStateStore, and to clean up some other stuff.
>
> Regards,
> Nick
>
>
> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> wrote:
>
>> Hi Nick,
>>
>> Thanks for the updates!
>>
>> I have a couple of questions/comments.
>>
>> 1.
>> Why do you propose a configuration that involves max. bytes and max.
>> reords? I think we are mainly concerned about memory consumption because
>> we want to limit the off-heap memory used. I cannot think of a case
>> where one would want to set the max. number of records.
>>
>>
>> 2.
>> Why does
>>
>>   default void commit(final Map<TopicPartition, Long> changelogOffsets) {
>>       flush();
>>   }
>>
>> take a map of partitions to changelog offsets?
>> The mapping between state stores to partitions is a 1:1 relationship.
>> Passing in a single changelog offset should suffice.
>>
>>
>> 3.
>> Why do we need the Transaction interface? It should be possible to hide
>> beginning and committing a transactions withing the state store
>> implementation, so that from outside the state store, it does not matter
>> whether the state store is transactional or not. What would be the
>> advantage of using the Transaction interface?
>>
>>
>> 4.
>> Regarding checkpointing offsets, I think we should keep the checkpoint
>> file in any case for the reason you mentioned about rebalancing. Even if
>> that would not be an issue, I would propose to move the change to offset
>> management to a new KIP and to not add more complexity than needed to
>> this one. I would not be too concerned about the consistency violation
>> you mention. As far as I understand, with transactional state stores
>> Streams would write the checkpoint file during every commit even under
>> EOS. In the failure case you describe, Streams would restore the state
>> stores from the offsets found in the checkpoint file written during the
>> penultimate commit instead of during the last commit. Basically, Streams
>> would overwrite the records written to the state store between the last
>> two commits with the same records read from the changelogs. While I
>> understand that this is wasteful, it is -- at the same time --
>> acceptable and most importantly it does not break EOS.
>>
>> Best,
>> Bruno
>>
>>
>> On 27.04.23 12:34, Nick Telford wrote:
>> > Hi everyone,
>> >
>> > I find myself (again) considering removing the offset management from
>> > StateStores, and keeping the old checkpoint file system. The reason is
>> that
>> > the StreamPartitionAssignor directly reads checkpoint files in order to
>> > determine which instance has the most up-to-date copy of the local
>> state.
>> > If we move offsets into the StateStore itself, then we will need to
>> open,
>> > initialize, read offsets and then close each StateStore (that is not
>> > already assigned and open) for which we have *any* local state, on every
>> > rebalance.
>> >
>> > Generally, I don't think there are many "orphan" stores like this
>> sitting
>> > around on most instances, but even a few would introduce additional
>> latency
>> > to an already somewhat lengthy rebalance procedure.
>> >
>> > I'm leaning towards Colt's (Slack) suggestion of just keeping things in
>> the
>> > checkpoint file(s) for now, and not worrying about the race. The
>> downside
>> > is that we wouldn't be able to remove the explicit RocksDB flush
>> on-commit,
>> > which likely hurts performance.
>> >
>> > If anyone has any thoughts or ideas on this subject, I would appreciate
>> it!
>> >
>> > Regards,
>> > Nick
>> >
>> > On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> >> Hi Colt,
>> >>
>> >> The issue is that if there's a crash between 2 and 3, then you still
>> end
>> >> up with inconsistent data in RocksDB. The only way to guarantee that
>> your
>> >> checkpoint offsets and locally stored data are consistent with each
>> other
>> >> are to atomically commit them, which can be achieved by having the
>> offsets
>> >> stored in RocksDB.
>> >>
>> >> The offsets column family is likely to be extremely small (one
>> >> per-changelog partition + one per Topology input partition for regular
>> >> stores, one per input partition for global stores). So the overhead
>> will be
>> >> minimal.
>> >>
>> >> A major benefit of doing this is that we can remove the explicit calls
>> to
>> >> db.flush(), which forcibly flushes memtables to disk on-commit. It
>> turns
>> >> out, RocksDB memtable flushes are largely dictated by Kafka Streams
>> >> commits, *not* RocksDB configuration, which could be a major source of
>> >> confusion. Atomic checkpointing makes it safe to remove these explicit
>> >> flushes, because it no longer matters exactly when RocksDB flushes
>> data to
>> >> disk; since the data and corresponding checkpoint offsets will always
>> be
>> >> flushed together, the local store is always in a consistent state, and
>> >> on-restart, it can always safely resume restoration from the on-disk
>> >> offsets, restoring the small amount of data that hadn't been flushed
>> when
>> >> the app exited/crashed.
>> >>
>> >> Regards,
>> >> Nick
>> >>
>> >> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io>
>> wrote:
>> >>
>> >>> Nick,
>> >>>
>> >>> Thanks for your reply. Ack to A) and B).
>> >>>
>> >>> For item C), I see what you're referring to. Your proposed solution
>> will
>> >>> work, so no need to change it. What I was suggesting was that it
>> might be
>> >>> possible to achieve this with only one column family. So long as:
>> >>>
>> >>>     - No uncommitted records (i.e. not committed to the changelog) are
>> >>>     *committed* to the state store, AND
>> >>>     - The Checkpoint offset (which refers to the changelog topic) is
>> less
>> >>>     than or equal to the last written changelog offset in rocksdb
>> >>>
>> >>> I don't see the need to do the full restoration from scratch. My
>> >>> understanding was that prior to 844/892, full restorations were
>> required
>> >>> because there could be uncommitted records written to RocksDB;
>> however,
>> >>> given your use of RocksDB transactions, that can be avoided with the
>> >>> pattern of 1) commit Kafka transaction, 2) commit RocksDB
>> transaction, 3)
>> >>> update offset in checkpoint file.
>> >>>
>> >>> Anyways, your proposed solution works equivalently and I don't believe
>> >>> there is much overhead to an additional column family in RocksDB.
>> Perhaps
>> >>> it may even perform better than making separate writes to the
>> checkpoint
>> >>> file.
>> >>>
>> >>> Colt McNealy
>> >>> *Founder, LittleHorse.io*
>> >>>
>> >>>
>> >>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <nick.telf...@gmail.com>
>> >>> wrote:
>> >>>
>> >>>> Hi Colt,
>> >>>>
>> >>>> A. I've done my best to de-couple the StateStore stuff from the rest
>> of
>> >>> the
>> >>>> Streams engine. The fact that there will be only one ongoing (write)
>> >>>> transaction at a time is not guaranteed by any API, and is just a
>> >>>> consequence of the way Streams operates. To that end, I tried to
>> ensure
>> >>> the
>> >>>> documentation and guarantees provided by the new APIs are
>> independent of
>> >>>> this incidental behaviour. In practice, you're right, this
>> essentially
>> >>>> refers to "interactive queries", which are technically "read
>> >>> transactions",
>> >>>> even if they don't actually use the transaction API to isolate
>> >>> themselves.
>> >>>>
>> >>>> B. Yes, although not ideal. This is for backwards compatibility,
>> >>> because:
>> >>>>      1) Existing custom StateStore implementations will implement
>> >>> flush(),
>> >>>> and not commit(), but the Streams engine now calls commit(), so those
>> >>> calls
>> >>>> need to be forwarded to flush() for these legacy stores.
>> >>>>      2) Existing StateStore *users*, i.e. outside of the Streams
>> engine
>> >>>> itself, may depend on explicitly calling flush(), so for these cases,
>> >>>> flush() needs to be redirected to call commit().
>> >>>> If anyone has a better way to guarantee compatibility without
>> >>> introducing
>> >>>> this potential recursion loop, I'm open to changes!
>> >>>>
>> >>>> C. This is described in the "Atomic Checkpointing" section. Offsets
>> are
>> >>>> stored in a separate RocksDB column family, which is guaranteed to be
>> >>>> atomically flushed to disk with all other column families. The issue
>> of
>> >>>> checkpoints being written to disk after commit causing inconsistency
>> if
>> >>> it
>> >>>> crashes in between is the reason why, under EOS, checkpoint files are
>> >>> only
>> >>>> written on clean shutdown. This is one of the major causes of "full
>> >>>> restorations", so moving the offsets into a place where they can be
>> >>>> guaranteed to be atomically written with the data they checkpoint
>> >>> allows us
>> >>>> to write the checkpoint offsets *on every commit*, not just on clean
>> >>>> shutdown.
>> >>>>
>> >>>> Regards,
>> >>>> Nick
>> >>>>
>> >>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io>
>> wrote:
>> >>>>
>> >>>>> Nick,
>> >>>>>
>> >>>>> Thank you for continuing this work. I have a few minor clarifying
>> >>>>> questions.
>> >>>>>
>> >>>>> A) "Records written to any transaction are visible to all other
>> >>>>> transactions immediately." I am confused here—I thought there could
>> >>> only
>> >>>> be
>> >>>>> one transaction going on at a time for a given state store given the
>> >>>>> threading model for processing records on a Task. Do you mean
>> >>> Interactive
>> >>>>> Queries by "other transactions"? (If so, then everything makes
>> sense—I
>> >>>>> thought that since IQ were read-only then they didn't count as
>> >>>>> transactions).
>> >>>>>
>> >>>>> B) Is it intentional that the default implementations of the flush()
>> >>> and
>> >>>>> commit() methods in the StateStore class refer to each other in some
>> >>> sort
>> >>>>> of unbounded recursion?
>> >>>>>
>> >>>>> C) How will the getCommittedOffset() method work? At first I thought
>> >>> the
>> >>>>> way to do it would be using a special key in the RocksDB store to
>> >>> store
>> >>>> the
>> >>>>> offset, and committing that with the transaction. But upon second
>> >>>> thought,
>> >>>>> since restoration from the changelog is an idempotent procedure, I
>> >>> think
>> >>>> it
>> >>>>> would be fine to 1) commit the RocksDB transaction and then 2) write
>> >>> the
>> >>>>> offset to disk in a checkpoint file. If there is a crash between 1)
>> >>> and
>> >>>> 2),
>> >>>>> I think the only downside is now we replay a few more records (at a
>> >>> cost
>> >>>> of
>> >>>>> <100ms). Am I missing something there?
>> >>>>>
>> >>>>> Other than that, everything makes sense to me.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Colt McNealy
>> >>>>> *Founder, LittleHorse.io*
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
>> nick.telf...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi everyone,
>> >>>>>>
>> >>>>>> I've updated the KIP to reflect the latest version of the design:
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> >>>>>>
>> >>>>>> There are several changes in there that reflect feedback from this
>> >>>>> thread,
>> >>>>>> and there's a new section and a bunch of interface changes relating
>> >>> to
>> >>>>>> Atomic Checkpointing, which is the final piece of the puzzle to
>> >>> making
>> >>>>>> everything robust.
>> >>>>>>
>> >>>>>> Let me know what you think!
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Nick
>> >>>>>>
>> >>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <nick.telf...@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi Lucas,
>> >>>>>>>
>> >>>>>>> Thanks for looking over my KIP.
>> >>>>>>>
>> >>>>>>> A) The bound is per-instance, not per-Task. This was a typo in the
>> >>>> KIP
>> >>>>>>> that I've now corrected. It was originally per-Task, but I
>> >>> changed it
>> >>>>> to
>> >>>>>>> per-instance for exactly the reason you highlighted.
>> >>>>>>> B) It's worth noting that transactionality is only enabled under
>> >>> EOS,
>> >>>>> and
>> >>>>>>> in the default mode of operation (ALOS), there should be no
>> >>> change in
>> >>>>>>> behavior at all. I think, under EOS, we can mitigate the impact on
>> >>>>> users
>> >>>>>> by
>> >>>>>>> sufficiently low default values for the memory bound
>> >>> configuration. I
>> >>>>>>> understand your hesitation to include a significant change of
>> >>>>> behaviour,
>> >>>>>>> especially in a minor release, but I suspect that most users will
>> >>>>> prefer
>> >>>>>>> the memory impact (under EOS) to the existing behaviour of
>> >>> frequent
>> >>>>> state
>> >>>>>>> restorations! If this is a problem, the changes can wait until the
>> >>>> next
>> >>>>>>> major release. I'll be running a patched version of streams in
>> >>>>> production
>> >>>>>>> with these changes as soon as they're ready, so it won't disrupt
>> >>> me
>> >>>> :-D
>> >>>>>>> C) The main purpose of this sentence was just to note that some
>> >>>> changes
>> >>>>>>> will need to be made to the way Segments are handled in order to
>> >>>> ensure
>> >>>>>>> they also benefit from transactions. At the time I wrote it, I
>> >>> hadn't
>> >>>>>>> figured out the specific changes necessary, so it was deliberately
>> >>>>> vague.
>> >>>>>>> This is the one outstanding problem I'm currently working on, and
>> >>>> I'll
>> >>>>>>> update this section with more detail once I have figured out the
>> >>>> exact
>> >>>>>>> changes required.
>> >>>>>>> D) newTransaction() provides the necessary isolation guarantees.
>> >>>> While
>> >>>>>>> the RocksDB implementation of transactions doesn't technically
>> >>> *need*
>> >>>>>>> read-only users to call newTransaction(), other implementations
>> >>>> (e.g. a
>> >>>>>>> hypothetical PostgresStore) may require it. Calling
>> >>> newTransaction()
>> >>>>> when
>> >>>>>>> no transaction is necessary is essentially free, as it will just
>> >>>> return
>> >>>>>>> this.
>> >>>>>>>
>> >>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it
>> >>> should
>> >>>> be
>> >>>>>>> fairly obvious where the performance problems stem from: writes
>> >>> under
>> >>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with the
>> >>>>>>> tombstone/record flag, 1 to decode it from the tombstone/record
>> >>> flag,
>> >>>>>> and 1
>> >>>>>>> to copy the record from the "temporary" store to the "main" store,
>> >>>> when
>> >>>>>> the
>> >>>>>>> transaction commits. The different approach taken by KIP-869
>> >>> should
>> >>>>>> perform
>> >>>>>>> much better, as it avoids all these copies, and may actually
>> >>> perform
>> >>>>>>> slightly better than trunk, due to batched writes in RocksDB
>> >>>> performing
>> >>>>>>> better than non-batched writes.[1]
>> >>>>>>>
>> >>>>>>> Regards,
>> >>>>>>> Nick
>> >>>>>>>
>> >>>>>>> 1:
>> >>>>>>
>> >>>>
>> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> >>>>>>>
>> >>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
>> >>> lbruts...@confluent.io
>> >>>>>> .invalid>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Nick,
>> >>>>>>>>
>> >>>>>>>> I'm just starting to read up on the whole discussion about
>> >>> KIP-892
>> >>>> and
>> >>>>>>>> KIP-844. Thanks a lot for your work on this, I do think
>> >>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have some
>> >>>>>>>> questions about the latest draft.
>> >>>>>>>>
>> >>>>>>>>   A) If I understand correctly, you propose to put a bound on the
>> >>>>>>>> (native) memory consumed by each task. However, I wonder if this
>> >>> is
>> >>>>>>>> sufficient if we have temporary imbalances in the cluster. For
>> >>>>>>>> example, depending on the timing of rebalances during a cluster
>> >>>>>>>> restart, it could happen that a single streams node is assigned a
>> >>>> lot
>> >>>>>>>> more tasks than expected. With your proposed change, this would
>> >>> mean
>> >>>>>>>> that the memory required by this one node could be a multiple of
>> >>>> what
>> >>>>>>>> is required during normal operation. I wonder if it wouldn't be
>> >>>> safer
>> >>>>>>>> to put a global bound on the memory use, across all tasks.
>> >>>>>>>>   B) Generally, the memory concerns still give me the feeling
>> that
>> >>>> this
>> >>>>>>>> should not be enabled by default for all users in a minor
>> >>> release.
>> >>>>>>>>   C) In section "Transaction Management": the sentence "A similar
>> >>>>>>>> analogue will be created to automatically manage `Segment`
>> >>>>>>>> transactions.". Maybe this is just me lacking some background,
>> >>> but I
>> >>>>>>>> do not understand this, it would be great if you could clarify
>> >>> what
>> >>>>>>>> you mean here.
>> >>>>>>>>   D) Could you please clarify why IQ has to call
>> newTransaction(),
>> >>>> when
>> >>>>>>>> it's read-only.
>> >>>>>>>>
>> >>>>>>>> And one last thing not strictly related to your KIP: if there is
>> >>> an
>> >>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x slower
>> >>> (e.g.
>> >>>>>>>> by providing a flame graph), that would be quite interesting.
>> >>>>>>>>
>> >>>>>>>> Cheers,
>> >>>>>>>> Lucas
>> >>>>>>>>
>> >>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
>> >>>> nick.telf...@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi everyone,
>> >>>>>>>>>
>> >>>>>>>>> I've updated the KIP with a more detailed design, which
>> >>> reflects
>> >>>> the
>> >>>>>>>>> implementation I've been working on:
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> >>>>>>>>>
>> >>>>>>>>> This new design should address the outstanding points already
>> >>> made
>> >>>>> in
>> >>>>>>>> the
>> >>>>>>>>> thread.
>> >>>>>>>>>
>> >>>>>>>>> Please let me know if there are areas that are unclear or need
>> >>>> more
>> >>>>>>>>> clarification.
>> >>>>>>>>>
>> >>>>>>>>> I have a (nearly) working implementation. I'm confident that
>> >>> the
>> >>>>>>>> remaining
>> >>>>>>>>> work (making Segments behave) will not impact the documented
>> >>>> design.
>> >>>>>>>>>
>> >>>>>>>>> Regards,
>> >>>>>>>>>
>> >>>>>>>>> Nick
>> >>>>>>>>>
>> >>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io
>> >>>>
>> >>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Nick,
>> >>>>>>>>>>
>> >>>>>>>>>> Thank you for the reply; that makes sense. I was hoping that,
>> >>>>> since
>> >>>>>>>> reading
>> >>>>>>>>>> uncommitted records from IQ in EOS isn't part of the
>> >>> documented
>> >>>>> API,
>> >>>>>>>> maybe
>> >>>>>>>>>> you *wouldn't* have to wait for the next major release to
>> >>> make
>> >>>>> that
>> >>>>>>>> change;
>> >>>>>>>>>> but given that it would be considered a major change, I like
>> >>>> your
>> >>>>>>>> approach
>> >>>>>>>>>> the best.
>> >>>>>>>>>>
>> >>>>>>>>>> Wishing you a speedy recovery and happy coding!
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks,
>> >>>>>>>>>> Colt McNealy
>> >>>>>>>>>> *Founder, LittleHorse.io*
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
>> >>>>>> nick.telf...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Colt,
>> >>>>>>>>>>>
>> >>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended to
>> >>> try
>> >>>> to
>> >>>>>>>> keep the
>> >>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd
>> >>> have
>> >>>> to
>> >>>>>>>> wait for
>> >>>>>>>>>> a
>> >>>>>>>>>>> major version release to land these changes.
>> >>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of
>> >>> guarantee,
>> >>>>> and
>> >>>>>>>> the
>> >>>>>>>>>>> typically longer commit intervals would be problematic when
>> >>>>>> reading
>> >>>>>>>> only
>> >>>>>>>>>>> "committed" records.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I've been away for 5 days recovering from minor surgery,
>> >>> but I
>> >>>>>>>> spent a
>> >>>>>>>>>>> considerable amount of that time working through ideas for
>> >>>>>> possible
>> >>>>>>>>>>> solutions in my head. I think your suggestion of keeping
>> >>> ALOS
>> >>>>>>>> as-is, but
>> >>>>>>>>>>> buffering writes for EOS is the right path forwards,
>> >>> although
>> >>>> I
>> >>>>>>>> have a
>> >>>>>>>>>>> solution that both expands on this, and provides for some
>> >>> more
>> >>>>>>>> formal
>> >>>>>>>>>>> guarantees.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Essentially, adding support to KeyValueStores for
>> >>>>> "Transactions",
>> >>>>>>>> with
>> >>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed"
>> >>> when
>> >>>>> under
>> >>>>>>>> EOS,
>> >>>>>>>>>> and
>> >>>>>>>>>>> "Read Uncommitted" under ALOS.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The nice thing about this approach is that it gives us much
>> >>>> more
>> >>>>>>>> clearly
>> >>>>>>>>>>> defined isolation behaviour that can be properly
>> >>> documented to
>> >>>>>>>> ensure
>> >>>>>>>>>> users
>> >>>>>>>>>>> know what to expect.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I'm still working out the kinks in the design, and will
>> >>> update
>> >>>>> the
>> >>>>>>>> KIP
>> >>>>>>>>>> when
>> >>>>>>>>>>> I have something. The main struggle is trying to implement
>> >>>> this
>> >>>>>>>> without
>> >>>>>>>>>>> making any major changes to the existing interfaces or
>> >>>> breaking
>> >>>>>>>> existing
>> >>>>>>>>>>> implementations, because currently everything expects to
>> >>>> operate
>> >>>>>>>> directly
>> >>>>>>>>>>> on a StateStore, and not a Transaction of that store. I
>> >>> think
>> >>>>> I'm
>> >>>>>>>> getting
>> >>>>>>>>>>> close, although sadly I won't be able to progress much
>> >>> until
>> >>>>> next
>> >>>>>>>> week
>> >>>>>>>>>> due
>> >>>>>>>>>>> to some work commitments.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Regards,
>> >>>>>>>>>>> Nick
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
>> >>>> c...@littlehorse.io>
>> >>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Nick,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thank you for the explanation, and also for the updated
>> >>>> KIP. I
>> >>>>>> am
>> >>>>>>>> quite
>> >>>>>>>>>>>> eager for this improvement to be released as it would
>> >>>> greatly
>> >>>>>>>> reduce
>> >>>>>>>>>> the
>> >>>>>>>>>>>> operational difficulties of EOS streams apps.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Two questions:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 10)
>> >>>>>>>>>>>>> When reading records, we will use the
>> >>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB
>> >>>>>>>>>>>>   and WriteBatchWithIndex#newIteratorWithBase utilities in
>> >>>>> order
>> >>>>>> to
>> >>>>>>>>>> ensure
>> >>>>>>>>>>>> that uncommitted writes are available to query.
>> >>>>>>>>>>>> Why do extra work to enable the reading of uncommitted
>> >>>> writes
>> >>>>>>>> during
>> >>>>>>>>>> IQ?
>> >>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, in
>> >>> my
>> >>>>>>>> opinion, a
>> >>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the
>> >>>>>> guarantee
>> >>>>>>>> that,
>> >>>>>>>>>>>> with EOS, IQ only reads committed records. In order to
>> >>> avoid
>> >>>>>> dirty
>> >>>>>>>>>> reads,
>> >>>>>>>>>>>> one currently must query a standby replica (but this
>> >>> still
>> >>>>>> doesn't
>> >>>>>>>>>> fully
>> >>>>>>>>>>>> guarantee monotonic reads).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 20) Is it also necessary to enable this optimization on
>> >>> ALOS
>> >>>>>>>> stores?
>> >>>>>>>>>> The
>> >>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to
>> >>>> restore
>> >>>>>>>> state
>> >>>>>>>>>> from
>> >>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was
>> >>>> acceptable
>> >>>>> to
>> >>>>>>>> accept
>> >>>>>>>>>>>> that there may have been uncommitted writes on disk. On a
>> >>>> side
>> >>>>>>>> note, if
>> >>>>>>>>>>> you
>> >>>>>>>>>>>> enable this type of store on ALOS processors, the
>> >>> community
>> >>>>>> would
>> >>>>>>>>>>>> definitely want to enable queries on dirty reads;
>> >>> otherwise
>> >>>>>> users
>> >>>>>>>> would
>> >>>>>>>>>>>> have to wait 30 seconds (default) to see an update.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thank you for doing this fantastic work!
>> >>>>>>>>>>>> Colt McNealy
>> >>>>>>>>>>>> *Founder, LittleHorse.io*
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
>> >>>>>>>> nick.telf...@gmail.com>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no
>> >>>> longer
>> >>>>>>>> include
>> >>>>>>>>>> the
>> >>>>>>>>>>>>> StateStore management of checkpointing. This can be
>> >>> added
>> >>>>> as a
>> >>>>>>>> KIP
>> >>>>>>>>>>> later
>> >>>>>>>>>>>> on
>> >>>>>>>>>>>>> to further optimize the consistency and performance of
>> >>>> state
>> >>>>>>>> stores.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I've also added a section discussing some of the
>> >>> concerns
>> >>>>>> around
>> >>>>>>>>>>>>> concurrency, especially in the presence of Iterators.
>> >>> I'm
>> >>>>>>>> thinking of
>> >>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting
>> >>>>>>>> copy-on-write
>> >>>>>>>>>>>>> implementation (that only makes a copy if there's an
>> >>>> active
>> >>>>>>>>>> iterator),
>> >>>>>>>>>>>> but
>> >>>>>>>>>>>>> I'm open to suggestions.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford <
>> >>>>>>>> nick.telf...@gmail.com>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Hi Colt,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I didn't do any profiling, but the 844
>> >>> implementation:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>     - Writes uncommitted records to a temporary
>> >>> RocksDB
>> >>>>>>>> instance
>> >>>>>>>>>>>>>>        - Since tombstones need to be flagged, all
>> >>> record
>> >>>>>>>> values are
>> >>>>>>>>>>>>>>        prefixed with a value/tombstone marker. This
>> >>>>>>>> necessitates a
>> >>>>>>>>>>>> memory
>> >>>>>>>>>>>>> copy.
>> >>>>>>>>>>>>>>     - On-commit, iterates all records in this
>> >>> temporary
>> >>>>>>>> instance and
>> >>>>>>>>>>>>>>     writes them to the main RocksDB store.
>> >>>>>>>>>>>>>>     - While iterating, the value/tombstone marker
>> >>> needs
>> >>>> to
>> >>>>> be
>> >>>>>>>> parsed
>> >>>>>>>>>>> and
>> >>>>>>>>>>>>>>     the real value extracted. This necessitates
>> >>> another
>> >>>>>> memory
>> >>>>>>>> copy.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> My guess is that the cost of iterating the temporary
>> >>>>> RocksDB
>> >>>>>>>> store
>> >>>>>>>>>> is
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>> major factor, with the 2 extra memory copies
>> >>> per-Record
>> >>>>>>>>>> contributing
>> >>>>>>>>>>> a
>> >>>>>>>>>>>>>> significant amount too.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
>> >>>>>>>> c...@littlehorse.io>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Hi all,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Out of curiosity, why does the performance of the
>> >>> store
>> >>>>>>>> degrade so
>> >>>>>>>>>>>>>>> significantly with the 844 implementation? I
>> >>> wouldn't
>> >>>> be
>> >>>>>> too
>> >>>>>>>>>>> surprised
>> >>>>>>>>>>>>> by
>> >>>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>> 50-60% drop (caused by each record being written
>> >>>> twice),
>> >>>>>> but
>> >>>>>>>> 96%
>> >>>>>>>>>> is
>> >>>>>>>>>>>>>>> extreme.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> The only thing I can think of which could create
>> >>> such a
>> >>>>>>>> bottleneck
>> >>>>>>>>>>>> would
>> >>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and
>> >>>> then
>> >>>>>>>>>>>> re-serializes
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>> store values when copying from the uncommitted to
>> >>>>> committed
>> >>>>>>>> store,
>> >>>>>>>>>>>> but I
>> >>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the
>> >>> PR.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Colt McNealy
>> >>>>>>>>>>>>>>> *Founder, LittleHorse.io*
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
>> >>>>>>>>>>> nick.telf...@gmail.com>
>> >>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points
>> >>> that
>> >>>>> have
>> >>>>>>>> been
>> >>>>>>>>>>> raised
>> >>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit
>> >>>>> interval
>> >>>>>>>> of 5
>> >>>>>>>>>>>> minutes
>> >>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow
>> >>>> too
>> >>>>>>>> large.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> There's a couple of different things I can think
>> >>> of
>> >>>> to
>> >>>>>>>> solve
>> >>>>>>>>>> this:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>     - We already have a memory/record limit in the
>> >>> KIP
>> >>>>> to
>> >>>>>>>> prevent
>> >>>>>>>>>>> OOM
>> >>>>>>>>>>>>>>>>     errors. Should we choose a default value for
>> >>>> these?
>> >>>>> My
>> >>>>>>>>>> concern
>> >>>>>>>>>>>> here
>> >>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>     anything we choose might seem rather
>> >>> arbitrary. We
>> >>>>>> could
>> >>>>>>>>>> change
>> >>>>>>>>>>>>>>>>     its behaviour such that under ALOS, it only
>> >>>> triggers
>> >>>>>> the
>> >>>>>>>>>> commit
>> >>>>>>>>>>>> of
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>     StateStore, but under EOS, it triggers a
>> >>> commit of
>> >>>>> the
>> >>>>>>>> Kafka
>> >>>>>>>>>>>>>>>> transaction.
>> >>>>>>>>>>>>>>>>     - We could introduce a separate `
>> >>>>>> checkpoint.interval.ms`
>> >>>>>>>> to
>> >>>>>>>>>>>> allow
>> >>>>>>>>>>>>>>> ALOS
>> >>>>>>>>>>>>>>>>     to commit the StateStores more frequently than
>> >>> the
>> >>>>>>>> general
>> >>>>>>>>>>>>>>>>     commit.interval.ms? My concern here is that
>> >>> the
>> >>>>>>>> semantics of
>> >>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>> config
>> >>>>>>>>>>>>>>>>     would depend on the processing.mode; under
>> >>> ALOS it
>> >>>>>> would
>> >>>>>>>>>> allow
>> >>>>>>>>>>>> more
>> >>>>>>>>>>>>>>>>     frequently committing stores, whereas under
>> >>> EOS it
>> >>>>>>>> couldn't.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Any better ideas?
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 16:25, Nick Telford <
>> >>>>>>>>>>> nick.telf...@gmail.com>
>> >>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Hi Alex,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Thanks for the feedback.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I've updated the discussion of OOM issues by
>> >>>>> describing
>> >>>>>>>> how
>> >>>>>>>>>>> we'll
>> >>>>>>>>>>>>>>> handle
>> >>>>>>>>>>>>>>>>> it. Here's the new text:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> To mitigate this, we will automatically force a
>> >>>> Task
>> >>>>>>>> commit if
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>>> total
>> >>>>>>>>>>>>>>>>>> uncommitted records returned by
>> >>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedEntries()
>> >>>>>> exceeds a
>> >>>>>>>>>>>> threshold,
>> >>>>>>>>>>>>>>>>>> configured by
>> >>>>> max.uncommitted.state.entries.per.task;
>> >>>>>>>> or the
>> >>>>>>>>>>>> total
>> >>>>>>>>>>>>>>>>>> memory used for buffering uncommitted records
>> >>>>> returned
>> >>>>>>>> by
>> >>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes()
>> >>>> exceeds
>> >>>>>> the
>> >>>>>>>>>>> threshold
>> >>>>>>>>>>>>>>>>>> configured by
>> >>>> max.uncommitted.state.bytes.per.task.
>> >>>>>>>> This will
>> >>>>>>>>>>>>> roughly
>> >>>>>>>>>>>>>>>>>> bound the memory required per-Task for
>> >>> buffering
>> >>>>>>>> uncommitted
>> >>>>>>>>>>>>> records,
>> >>>>>>>>>>>>>>>>>> irrespective of the commit.interval.ms, and
>> >>> will
>> >>>>>>>> effectively
>> >>>>>>>>>>>> bound
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> number of records that will need to be
>> >>> restored in
>> >>>>> the
>> >>>>>>>> event
>> >>>>>>>>>>> of a
>> >>>>>>>>>>>>>>>> failure.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> These limits will be checked in
>> >>> StreamTask#process
>> >>>>> and
>> >>>>>> a
>> >>>>>>>>>>> premature
>> >>>>>>>>>>>>>>> commit
>> >>>>>>>>>>>>>>>>>> will be requested via Task#requestCommit().
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Note that these new methods provide default
>> >>>>>>>> implementations
>> >>>>>>>>>> that
>> >>>>>>>>>>>>>>> ensure
>> >>>>>>>>>>>>>>>>>> existing custom stores and non-transactional
>> >>>> stores
>> >>>>>>>> (e.g.
>> >>>>>>>>>>>>>>>>>> InMemoryKeyValueStore) do not force any early
>> >>>>> commits.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I've chosen to have the StateStore expose
>> >>>>>> approximations
>> >>>>>>>> of
>> >>>>>>>>>> its
>> >>>>>>>>>>>>> buffer
>> >>>>>>>>>>>>>>>>> size/count instead of opaquely requesting a
>> >>> commit
>> >>>> in
>> >>>>>>>> order to
>> >>>>>>>>>>>>>>> delegate
>> >>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> decision making to the Task itself. This enables
>> >>>>> Tasks
>> >>>>>>>> to look
>> >>>>>>>>>>> at
>> >>>>>>>>>>>>>>> *all*
>> >>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>> their StateStores, and determine whether an
>> >>> early
>> >>>>>> commit
>> >>>>>>>> is
>> >>>>>>>>>>>>> necessary.
>> >>>>>>>>>>>>>>>>> Notably, it enables pre-Task thresholds,
>> >>> instead of
>> >>>>>>>> per-Store,
>> >>>>>>>>>>>> which
>> >>>>>>>>>>>>>>>>> prevents Tasks with many StateStores from using
>> >>>> much
>> >>>>>> more
>> >>>>>>>>>> memory
>> >>>>>>>>>>>>> than
>> >>>>>>>>>>>>>>>> Tasks
>> >>>>>>>>>>>>>>>>> with one StateStore. This makes sense, since
>> >>>> commits
>> >>>>>> are
>> >>>>>>>> done
>> >>>>>>>>>>>>> by-Task,
>> >>>>>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>> by-Store.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Prizes* for anyone who can come up with a better
>> >>>> name
>> >>>>>>>> for the
>> >>>>>>>>>>> new
>> >>>>>>>>>>>>>>> config
>> >>>>>>>>>>>>>>>>> properties!
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Thanks for pointing out the potential
>> >>> performance
>> >>>>>> issues
>> >>>>>>>> of
>> >>>>>>>>>>> WBWI.
>> >>>>>>>>>>>>> From
>> >>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> benchmarks that user posted[1], it looks like
>> >>> WBWI
>> >>>>>> still
>> >>>>>>>>>>> performs
>> >>>>>>>>>>>>>>>>> considerably better than individual puts, which
>> >>> is
>> >>>>> the
>> >>>>>>>>>> existing
>> >>>>>>>>>>>>>>> design,
>> >>>>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>>> I'd actually expect a performance boost from
>> >>> WBWI,
>> >>>>> just
>> >>>>>>>> not as
>> >>>>>>>>>>>> great
>> >>>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>>>> we'd get from a plain WriteBatch. This does
>> >>> suggest
>> >>>>>> that
>> >>>>>>>> a
>> >>>>>>>>>> good
>> >>>>>>>>>>>>>>>>> optimization would be to use a regular
>> >>> WriteBatch
>> >>>> for
>> >>>>>>>>>>> restoration
>> >>>>>>>>>>>>> (in
>> >>>>>>>>>>>>>>>>> RocksDBStore#restoreBatch), since we know that
>> >>>> those
>> >>>>>>>> records
>> >>>>>>>>>>> will
>> >>>>>>>>>>>>>>> never
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>> queried before they're committed.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> 1:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>
>> >>>>>
>> >>>
>> https://github.com/adamretter/rocksjava-write-methods-benchmark#results
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> * Just kidding, no prizes, sadly.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 12:28, Alexander
>> >>> Sorokoumov
>> >>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Hey Nick,
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Thank you for the KIP! With such a significant
>> >>>>>>>> performance
>> >>>>>>>>>>>>>>> degradation
>> >>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>> the secondary store approach, we should
>> >>> definitely
>> >>>>>>>> consider
>> >>>>>>>>>>>>>>>>>> WriteBatchWithIndex. I also like encapsulating
>> >>>>>>>> checkpointing
>> >>>>>>>>>>>> inside
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> default state store implementation to improve
>> >>>>>>>> performance.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> +1 to John's comment to keep the current
>> >>>>> checkpointing
>> >>>>>>>> as a
>> >>>>>>>>>>>>> fallback
>> >>>>>>>>>>>>>>>>>> mechanism. We want to keep existing users'
>> >>>> workflows
>> >>>>>>>> intact
>> >>>>>>>>>> if
>> >>>>>>>>>>> we
>> >>>>>>>>>>>>>>> can. A
>> >>>>>>>>>>>>>>>>>> non-intrusive way would be to add a separate
>> >>>>>> StateStore
>> >>>>>>>>>> method,
>> >>>>>>>>>>>>> say,
>> >>>>>>>>>>>>>>>>>> StateStore#managesCheckpointing(), that
>> >>> controls
>> >>>>>>>> whether the
>> >>>>>>>>>>>> state
>> >>>>>>>>>>>>>>> store
>> >>>>>>>>>>>>>>>>>> implementation owns checkpointing.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> I think that a solution to the transactional
>> >>>> writes
>> >>>>>>>> should
>> >>>>>>>>>>>> address
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> OOMEs. One possible way to address that is to
>> >>> wire
>> >>>>>>>>>> StateStore's
>> >>>>>>>>>>>>>>> commit
>> >>>>>>>>>>>>>>>>>> request by adding, say, StateStore#commitNeeded
>> >>>> that
>> >>>>>> is
>> >>>>>>>>>> checked
>> >>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>> StreamTask#commitNeeded via the corresponding
>> >>>>>>>>>>>>> ProcessorStateManager.
>> >>>>>>>>>>>>>>>> With
>> >>>>>>>>>>>>>>>>>> that change, RocksDBStore will have to track
>> >>> the
>> >>>>>> current
>> >>>>>>>>>>>>> transaction
>> >>>>>>>>>>>>>>>> size
>> >>>>>>>>>>>>>>>>>> and request a commit when the size goes over a
>> >>>>>>>> (configurable)
>> >>>>>>>>>>>>>>> threshold.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> AFAIU WriteBatchWithIndex might perform
>> >>>>> significantly
>> >>>>>>>> slower
>> >>>>>>>>>>> than
>> >>>>>>>>>>>>>>>> non-txn
>> >>>>>>>>>>>>>>>>>> puts as the batch size grows [1]. We should
>> >>> have a
>> >>>>>>>>>>> configuration
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>>> fall
>> >>>>>>>>>>>>>>>>>> back to the current behavior (and/or disable
>> >>> txn
>> >>>>>> stores
>> >>>>>>>> for
>> >>>>>>>>>>> ALOS)
>> >>>>>>>>>>>>>>> unless
>> >>>>>>>>>>>>>>>>>> the benchmarks show negligible overhead for
>> >>> longer
>> >>>>>>>> commits /
>> >>>>>>>>>>>>>>>> large-enough
>> >>>>>>>>>>>>>>>>>> batch sizes.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> If you prefer to keep the KIP smaller, I would
>> >>>>> rather
>> >>>>>>>> cut out
>> >>>>>>>>>>>>>>>>>> state-store-managed checkpointing rather than
>> >>>> proper
>> >>>>>>>> OOMe
>> >>>>>>>>>>>> handling
>> >>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>> being able to switch to non-txn behavior. The
>> >>>>>>>> checkpointing
>> >>>>>>>>>> is
>> >>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>>> necessary to solve the recovery-under-EOS
>> >>> problem.
>> >>>>> On
>> >>>>>>>> the
>> >>>>>>>>>> other
>> >>>>>>>>>>>>> hand,
>> >>>>>>>>>>>>>>>> once
>> >>>>>>>>>>>>>>>>>> WriteBatchWithIndex is in, it will be much
>> >>> easier
>> >>>> to
>> >>>>>> add
>> >>>>>>>>>>>>>>>>>> state-store-managed checkpointing.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> If you share the current implementation, I am
>> >>>> happy
>> >>>>> to
>> >>>>>>>> help
>> >>>>>>>>>> you
>> >>>>>>>>>>>>>>> address
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> OOMe and configuration parts as well as review
>> >>> and
>> >>>>>> test
>> >>>>>>>> the
>> >>>>>>>>>>>> patch.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>> Alex
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 1.
>> >>> https://github.com/facebook/rocksdb/issues/608
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
>> >>>>>>>>>>>>> nick.telf...@gmail.com
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Hi John,
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Thanks for the review and feedback!
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> 1. Custom Stores: I've been mulling over this
>> >>>>>> problem
>> >>>>>>>>>> myself.
>> >>>>>>>>>>>> As
>> >>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>>> stands,
>> >>>>>>>>>>>>>>>>>>> custom stores would essentially lose
>> >>>> checkpointing
>> >>>>>>>> with no
>> >>>>>>>>>>>>>>> indication
>> >>>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>> they're expected to make changes, besides a
>> >>> line
>> >>>>> in
>> >>>>>>>> the
>> >>>>>>>>>>> release
>> >>>>>>>>>>>>>>>> notes. I
>> >>>>>>>>>>>>>>>>>>> agree that the best solution would be to
>> >>>> provide a
>> >>>>>>>> default
>> >>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>> checkpoints
>> >>>>>>>>>>>>>>>>>>> to a file. The one thing I would change is
>> >>> that
>> >>>>> the
>> >>>>>>>>>>>> checkpointing
>> >>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>>>>>> store-local file, instead of a per-Task file.
>> >>>> This
>> >>>>>>>> way the
>> >>>>>>>>>>>>>>> StateStore
>> >>>>>>>>>>>>>>>>>> still
>> >>>>>>>>>>>>>>>>>>> technically owns its own checkpointing (via a
>> >>>>>> default
>> >>>>>>>>>>>>>>> implementation),
>> >>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>> the StateManager/Task execution engine
>> >>> doesn't
>> >>>>> need
>> >>>>>>>> to know
>> >>>>>>>>>>>>>>> anything
>> >>>>>>>>>>>>>>>>>> about
>> >>>>>>>>>>>>>>>>>>> checkpointing, which greatly simplifies some
>> >>> of
>> >>>>> the
>> >>>>>>>> logic.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> 2. OOME errors: The main reasons why I didn't
>> >>>>>> explore
>> >>>>>>>> a
>> >>>>>>>>>>>> solution
>> >>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> this is
>> >>>>>>>>>>>>>>>>>>> a) to keep this KIP as simple as possible,
>> >>> and
>> >>>> b)
>> >>>>>>>> because
>> >>>>>>>>>> I'm
>> >>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>>> exactly
>> >>>>>>>>>>>>>>>>>>> how to signal that a Task should commit
>> >>>>> prematurely.
>> >>>>>>>> I'm
>> >>>>>>>>>>>>> confident
>> >>>>>>>>>>>>>>>> it's
>> >>>>>>>>>>>>>>>>>>> possible, and I think it's worth adding a
>> >>>> section
>> >>>>> on
>> >>>>>>>>>> handling
>> >>>>>>>>>>>>> this.
>> >>>>>>>>>>>>>>>>>> Besides
>> >>>>>>>>>>>>>>>>>>> my proposal to force an early commit once
>> >>> memory
>> >>>>>> usage
>> >>>>>>>>>>> reaches
>> >>>>>>>>>>>> a
>> >>>>>>>>>>>>>>>>>> threshold,
>> >>>>>>>>>>>>>>>>>>> is there any other approach that you might
>> >>>> suggest
>> >>>>>> for
>> >>>>>>>>>>> tackling
>> >>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>> problem?
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> 3. ALOS: I can add in an explicit paragraph,
>> >>> but
>> >>>>> my
>> >>>>>>>>>>> assumption
>> >>>>>>>>>>>> is
>> >>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>> since transactional behaviour comes at
>> >>> little/no
>> >>>>>>>> cost, that
>> >>>>>>>>>>> it
>> >>>>>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>>>> available by default on all stores,
>> >>> irrespective
>> >>>>> of
>> >>>>>>>> the
>> >>>>>>>>>>>>> processing
>> >>>>>>>>>>>>>>>> mode.
>> >>>>>>>>>>>>>>>>>>> While ALOS doesn't use transactions, the Task
>> >>>>> itself
>> >>>>>>>> still
>> >>>>>>>>>>>>>>> "commits",
>> >>>>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>>>>> the behaviour should be correct under ALOS
>> >>> too.
>> >>>>> I'm
>> >>>>>>>> not
>> >>>>>>>>>>>> convinced
>> >>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>> it's
>> >>>>>>>>>>>>>>>>>>> worth having both
>> >>>> transactional/non-transactional
>> >>>>>>>> stores
>> >>>>>>>>>>>>>>> available, as
>> >>>>>>>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>>>> would considerably increase the complexity of
>> >>>> the
>> >>>>>>>> codebase,
>> >>>>>>>>>>> for
>> >>>>>>>>>>>>>>> very
>> >>>>>>>>>>>>>>>>>> little
>> >>>>>>>>>>>>>>>>>>> benefit.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> 4. Method deprecation: Are you referring to
>> >>>>>>>>>>>>>>> StateStore#getPosition()?
>> >>>>>>>>>>>>>>>>>> As I
>> >>>>>>>>>>>>>>>>>>> understand it, Position contains the
>> >>> position of
>> >>>>> the
>> >>>>>>>>>> *source*
>> >>>>>>>>>>>>>>> topics,
>> >>>>>>>>>>>>>>>>>>> whereas the commit offsets would be the
>> >>>>> *changelog*
>> >>>>>>>>>> offsets.
>> >>>>>>>>>>> So
>> >>>>>>>>>>>>>>> it's
>> >>>>>>>>>>>>>>>>>> still
>> >>>>>>>>>>>>>>>>>>> necessary to retain the Position data, as
>> >>> well
>> >>>> as
>> >>>>>> the
>> >>>>>>>>>>> changelog
>> >>>>>>>>>>>>>>>> offsets.
>> >>>>>>>>>>>>>>>>>>> What I meant in the KIP is that Position
>> >>> offsets
>> >>>>> are
>> >>>>>>>>>>> currently
>> >>>>>>>>>>>>>>> stored
>> >>>>>>>>>>>>>>>>>> in a
>> >>>>>>>>>>>>>>>>>>> file, and since we can atomically store
>> >>> metadata
>> >>>>>>>> along with
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>>> record
>> >>>>>>>>>>>>>>>>>>> batch we commit to RocksDB, we can move our
>> >>>>> Position
>> >>>>>>>>>> offsets
>> >>>>>>>>>>> in
>> >>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>> metadata too, and gain the same transactional
>> >>>>>>>> guarantees
>> >>>>>>>>>> that
>> >>>>>>>>>>>> we
>> >>>>>>>>>>>>>>> will
>> >>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>> changelog offsets, ensuring that the Position
>> >>>>>> offsets
>> >>>>>>>> are
>> >>>>>>>>>>>>>>> consistent
>> >>>>>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>> the records that are read from the database.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> On Tue, 22 Nov 2022 at 16:25, John Roesler <
>> >>>>>>>>>>>> vvcep...@apache.org>
>> >>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Thanks for publishing this alternative,
>> >>> Nick!
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> The benchmark you mentioned in the KIP-844
>> >>>>>>>> discussion
>> >>>>>>>>>> seems
>> >>>>>>>>>>>>> like
>> >>>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>>>>>>> compelling reason to revisit the built-in
>> >>>>>>>>>> transactionality
>> >>>>>>>>>>>>>>>> mechanism.
>> >>>>>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>>>>> also appreciate you analysis, showing that
>> >>> for
>> >>>>>> most
>> >>>>>>>> use
>> >>>>>>>>>>>> cases,
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> write
>> >>>>>>>>>>>>>>>>>>>> batch approach should be just fine.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> There are a couple of points that would
>> >>> hold
>> >>>> me
>> >>>>>>>> back from
>> >>>>>>>>>>>>>>> approving
>> >>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>> KIP right now:
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> 1. Loss of coverage for custom stores.
>> >>>>>>>>>>>>>>>>>>>> The fact that you can plug in a
>> >>> (relatively)
>> >>>>>> simple
>> >>>>>>>>>>>>>>> implementation
>> >>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> XStateStore interfaces and automagically
>> >>> get a
>> >>>>>>>>>> distributed
>> >>>>>>>>>>>>>>> database
>> >>>>>>>>>>>>>>>>>> out
>> >>>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>> it is a significant benefit of Kafka
>> >>> Streams.
>> >>>>> I'd
>> >>>>>>>> hate to
>> >>>>>>>>>>>> lose
>> >>>>>>>>>>>>>>> it,
>> >>>>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>>>>> would be better to spend some time and
>> >>> come up
>> >>>>>> with
>> >>>>>>>> a way
>> >>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> preserve
>> >>>>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>> property. For example, can we provide a
>> >>>> default
>> >>>>>>>>>>>> implementation
>> >>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>> `commit(..)` that re-implements the
>> >>> existing
>> >>>>>>>>>>> checkpoint-file
>> >>>>>>>>>>>>>>>>>> approach? Or
>> >>>>>>>>>>>>>>>>>>>> perhaps add an `isTransactional()` flag to
>> >>> the
>> >>>>>> state
>> >>>>>>>>>> store
>> >>>>>>>>>>>>>>> interface
>> >>>>>>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>>>>>> that the runtime can decide whether to
>> >>>> continue
>> >>>>> to
>> >>>>>>>> manage
>> >>>>>>>>>>>>>>> checkpoint
>> >>>>>>>>>>>>>>>>>>> files
>> >>>>>>>>>>>>>>>>>>>> vs delegating transactionality to the
>> >>> stores?
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> 2. Guarding against OOME
>> >>>>>>>>>>>>>>>>>>>> I appreciate your analysis, but I don't
>> >>> think
>> >>>>> it's
>> >>>>>>>>>>> sufficient
>> >>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>> say
>> >>>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>>> we will solve the memory problem later if
>> >>> it
>> >>>>>> becomes
>> >>>>>>>>>>>> necessary.
>> >>>>>>>>>>>>>>> The
>> >>>>>>>>>>>>>>>>>>>> experience leading to that situation would
>> >>> be
>> >>>>>> quite
>> >>>>>>>> bad:
>> >>>>>>>>>>>>> Imagine,
>> >>>>>>>>>>>>>>>> you
>> >>>>>>>>>>>>>>>>>>>> upgrade to AK 3.next, your tests pass, so
>> >>> you
>> >>>>>>>> deploy to
>> >>>>>>>>>>>>>>> production.
>> >>>>>>>>>>>>>>>>>> That
>> >>>>>>>>>>>>>>>>>>>> night, you get paged because your app is
>> >>> now
>> >>>>>>>> crashing
>> >>>>>>>>>> with
>> >>>>>>>>>>>>>>> OOMEs. As
>> >>>>>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>>> all OOMEs, you'll have a really hard time
>> >>>>> finding
>> >>>>>>>> the
>> >>>>>>>>>> root
>> >>>>>>>>>>>>> cause,
>> >>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>> once
>> >>>>>>>>>>>>>>>>>>>> you do, you won't have a clear path to
>> >>> resolve
>> >>>>> the
>> >>>>>>>> issue.
>> >>>>>>>>>>> You
>> >>>>>>>>>>>>>>> could
>> >>>>>>>>>>>>>>>>>> only
>> >>>>>>>>>>>>>>>>>>>> tune down the commit interval and cache
>> >>> buffer
>> >>>>>> size
>> >>>>>>>> until
>> >>>>>>>>>>> you
>> >>>>>>>>>>>>>>> stop
>> >>>>>>>>>>>>>>>>>>> getting
>> >>>>>>>>>>>>>>>>>>>> crashes.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> FYI, I know of multiple cases where people
>> >>> run
>> >>>>> EOS
>> >>>>>>>> with
>> >>>>>>>>>>> much
>> >>>>>>>>>>>>>>> larger
>> >>>>>>>>>>>>>>>>>>> commit
>> >>>>>>>>>>>>>>>>>>>> intervals to get better batching than the
>> >>>>> default,
>> >>>>>>>> so I
>> >>>>>>>>>>> don't
>> >>>>>>>>>>>>>>> think
>> >>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>> pathological case would be as rare as you
>> >>>>> suspect.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Given that we already have the rudiments
>> >>> of an
>> >>>>>> idea
>> >>>>>>>> of
>> >>>>>>>>>> what
>> >>>>>>>>>>>> we
>> >>>>>>>>>>>>>>> could
>> >>>>>>>>>>>>>>>>>> do
>> >>>>>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>>>> prevent this downside, we should take the
>> >>> time
>> >>>>> to
>> >>>>>>>> design
>> >>>>>>>>>> a
>> >>>>>>>>>>>>>>> solution.
>> >>>>>>>>>>>>>>>>>> We
>> >>>>>>>>>>>>>>>>>>> owe
>> >>>>>>>>>>>>>>>>>>>> it to our users to ensure that awesome new
>> >>>>>> features
>> >>>>>>>> don't
>> >>>>>>>>>>>> come
>> >>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>> bitter
>> >>>>>>>>>>>>>>>>>>>> pills unless we can't avoid it.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> 3. ALOS mode.
>> >>>>>>>>>>>>>>>>>>>> On the other hand, I didn't see an
>> >>> indication
>> >>>> of
>> >>>>>> how
>> >>>>>>>>>> stores
>> >>>>>>>>>>>>> will
>> >>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>>>>> handled under ALOS (aka non-EOS) mode.
>> >>>>>>>> Theoretically, the
>> >>>>>>>>>>>>>>>>>>> transactionality
>> >>>>>>>>>>>>>>>>>>>> of the store and the processing mode are
>> >>>>>>>> orthogonal. A
>> >>>>>>>>>>>>>>> transactional
>> >>>>>>>>>>>>>>>>>>> store
>> >>>>>>>>>>>>>>>>>>>> would serve ALOS just as well as a
>> >>>>>>>> non-transactional one
>> >>>>>>>>>>> (if
>> >>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>>> better).
>> >>>>>>>>>>>>>>>>>>>> Under ALOS, though, the default commit
>> >>>> interval
>> >>>>> is
>> >>>>>>>> five
>> >>>>>>>>>>>>> minutes,
>> >>>>>>>>>>>>>>> so
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> memory issue is far more pressing.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> As I see it, we have several options to
>> >>>> resolve
>> >>>>>> this
>> >>>>>>>>>> point.
>> >>>>>>>>>>>> We
>> >>>>>>>>>>>>>>> could
>> >>>>>>>>>>>>>>>>>>>> demonstrate that transactional stores work
>> >>>> just
>> >>>>>>>> fine for
>> >>>>>>>>>>> ALOS
>> >>>>>>>>>>>>>>> and we
>> >>>>>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>> therefore just swap over unconditionally.
>> >>> We
>> >>>>> could
>> >>>>>>>> also
>> >>>>>>>>>>>> disable
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> transactional mechanism under ALOS so that
>> >>>>> stores
>> >>>>>>>> operate
>> >>>>>>>>>>>> just
>> >>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> same
>> >>>>>>>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>>>>>>> they do today when run in ALOS mode.
>> >>> Finally,
>> >>>> we
>> >>>>>>>> could do
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>>> same
>> >>>>>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>> KIP-844 and make transactional stores
>> >>> opt-in
>> >>>>> (it'd
>> >>>>>>>> be
>> >>>>>>>>>>> better
>> >>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> avoid
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> extra opt-in mechanism, but it's a good
>> >>>>>>>>>>> get-out-of-jail-free
>> >>>>>>>>>>>>>>> card).
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> 4. (minor point) Deprecation of methods
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> You mentioned that the new `commit` method
>> >>>>>> replaces
>> >>>>>>>>>> flush,
>> >>>>>>>>>>>>>>>>>>>> updateChangelogOffsets, and checkpoint. It
>> >>>> seems
>> >>>>>> to
>> >>>>>>>> me
>> >>>>>>>>>> that
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>> point
>> >>>>>>>>>>>>>>>>>>> about
>> >>>>>>>>>>>>>>>>>>>> atomicity and Position also suggests that
>> >>> it
>> >>>>>>>> replaces the
>> >>>>>>>>>>>>>>> Position
>> >>>>>>>>>>>>>>>>>>>> callbacks. However, the proposal only
>> >>>> deprecates
>> >>>>>>>> `flush`.
>> >>>>>>>>>>>>> Should
>> >>>>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>>>>> deprecating other methods as well?
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Thanks again for the KIP! It's really nice
>> >>>> that
>> >>>>>> you
>> >>>>>>>> and
>> >>>>>>>>>>> Alex
>> >>>>>>>>>>>>> will
>> >>>>>>>>>>>>>>>> get
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> chance to collaborate on both directions so
>> >>>> that
>> >>>>>> we
>> >>>>>>>> can
>> >>>>>>>>>> get
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>> best
>> >>>>>>>>>>>>>>>>>>>> outcome for Streams and its users.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> -John
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> On 2022/11/21 15:02:15 Nick Telford wrote:
>> >>>>>>>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> As I mentioned in the discussion thread
>> >>> for
>> >>>>>>>> KIP-844,
>> >>>>>>>>>> I've
>> >>>>>>>>>>>>> been
>> >>>>>>>>>>>>>>>>>> working
>> >>>>>>>>>>>>>>>>>>> on
>> >>>>>>>>>>>>>>>>>>>>> an alternative approach to achieving
>> >>> better
>> >>>>>>>>>> transactional
>> >>>>>>>>>>>>>>>> semantics
>> >>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>> Kafka Streams StateStores.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> I've published this separately as
>> >>> KIP-892:
>> >>>>>>>>>> Transactional
>> >>>>>>>>>>>>>>> Semantics
>> >>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>> StateStores
>> >>>>>>>>>>>>>>>>>>>>> <
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
>> >>>>>>>>>>>>>>>>>>>>> ,
>> >>>>>>>>>>>>>>>>>>>>> so that it can be discussed/reviewed
>> >>>>> separately
>> >>>>>>>> from
>> >>>>>>>>>>>> KIP-844.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Alex: I'm especially interested in what
>> >>> you
>> >>>>>> think!
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> I have a nearly complete implementation
>> >>> of
>> >>>> the
>> >>>>>>>> changes
>> >>>>>>>>>>>>>>> outlined in
>> >>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>> KIP, please let me know if you'd like me
>> >>> to
>> >>>>> push
>> >>>>>>>> them
>> >>>>>>>>>> for
>> >>>>>>>>>>>>>>> review
>> >>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>> advance
>> >>>>>>>>>>>>>>>>>>>>> of a vote.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to