Hi everyone, Quick update: I've added a new section to the KIP: "Offsets for Consumer Rebalances", that outlines my solution to the problem that StreamsPartitionAssignor needs to read StateStore offsets even if they're not currently open.
Regards, Nick On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> wrote: > Hi Bruno, > > Thanks for reviewing my proposal. > > 1. > The main reason I added it was because it was easy to do. If we see no > value in it, I can remove it. > > 2. > Global StateStores can have multiple partitions in their input topics > (which function as their changelogs), so they would have more than one > partition. > > 3. > That's a good point. At present, the only method it adds is > isolationLevel(), which is likely not necessary outside of StateStores. > It *does* provide slightly different guarantees in the documentation to > several of the methods (hence the overrides). I'm not sure if this is > enough to warrant a new interface though. > I think the question that remains is whether this interface makes it > easier to implement custom transactional StateStores than if we were to > remove it? Probably not. > > 4. > The main motivation for the Atomic Checkpointing is actually performance. > My team has been testing out an implementation of this KIP without it, and > we had problems with RocksDB doing *much* more compaction, due to the > significantly increased flush rate. It was enough of a problem that (for > the time being), we had to revert back to Kafka Streams proper. > I think the best way to solve this, as you say, is to keep the .checkpoint > files *in addition* to the offsets being stored within the store itself. > Essentially, when closing StateStores, we force a memtable flush, then > call getCommittedOffsets and write those out to the .checkpoint file. > That would ensure the metadata is available to the > StreamsPartitionAssignor for all closed stores. > If there's a crash (no clean close), then we won't be able to guarantee > which offsets were flushed to disk by RocksDB, so we'd need to open ( > init()), read offsets, and then close() those stores. But since this is > the exception, and will only occur once (provided it doesn't crash every > time!), I think the performance impact here would be acceptable. > > Thanks for the feedback, please let me know if you have any more comments > or questions! > > I'm currently working on rebasing against trunk. This involves adding > support for transactionality to VersionedStateStores. I will probably need > to revise my implementation for transactional "segmented" stores, both to > accommodate VersionedStateStore, and to clean up some other stuff. > > Regards, > Nick > > > On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> wrote: > >> Hi Nick, >> >> Thanks for the updates! >> >> I have a couple of questions/comments. >> >> 1. >> Why do you propose a configuration that involves max. bytes and max. >> reords? I think we are mainly concerned about memory consumption because >> we want to limit the off-heap memory used. I cannot think of a case >> where one would want to set the max. number of records. >> >> >> 2. >> Why does >> >> default void commit(final Map<TopicPartition, Long> changelogOffsets) { >> flush(); >> } >> >> take a map of partitions to changelog offsets? >> The mapping between state stores to partitions is a 1:1 relationship. >> Passing in a single changelog offset should suffice. >> >> >> 3. >> Why do we need the Transaction interface? It should be possible to hide >> beginning and committing a transactions withing the state store >> implementation, so that from outside the state store, it does not matter >> whether the state store is transactional or not. What would be the >> advantage of using the Transaction interface? >> >> >> 4. >> Regarding checkpointing offsets, I think we should keep the checkpoint >> file in any case for the reason you mentioned about rebalancing. Even if >> that would not be an issue, I would propose to move the change to offset >> management to a new KIP and to not add more complexity than needed to >> this one. I would not be too concerned about the consistency violation >> you mention. As far as I understand, with transactional state stores >> Streams would write the checkpoint file during every commit even under >> EOS. In the failure case you describe, Streams would restore the state >> stores from the offsets found in the checkpoint file written during the >> penultimate commit instead of during the last commit. Basically, Streams >> would overwrite the records written to the state store between the last >> two commits with the same records read from the changelogs. While I >> understand that this is wasteful, it is -- at the same time -- >> acceptable and most importantly it does not break EOS. >> >> Best, >> Bruno >> >> >> On 27.04.23 12:34, Nick Telford wrote: >> > Hi everyone, >> > >> > I find myself (again) considering removing the offset management from >> > StateStores, and keeping the old checkpoint file system. The reason is >> that >> > the StreamPartitionAssignor directly reads checkpoint files in order to >> > determine which instance has the most up-to-date copy of the local >> state. >> > If we move offsets into the StateStore itself, then we will need to >> open, >> > initialize, read offsets and then close each StateStore (that is not >> > already assigned and open) for which we have *any* local state, on every >> > rebalance. >> > >> > Generally, I don't think there are many "orphan" stores like this >> sitting >> > around on most instances, but even a few would introduce additional >> latency >> > to an already somewhat lengthy rebalance procedure. >> > >> > I'm leaning towards Colt's (Slack) suggestion of just keeping things in >> the >> > checkpoint file(s) for now, and not worrying about the race. The >> downside >> > is that we wouldn't be able to remove the explicit RocksDB flush >> on-commit, >> > which likely hurts performance. >> > >> > If anyone has any thoughts or ideas on this subject, I would appreciate >> it! >> > >> > Regards, >> > Nick >> > >> > On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com> >> wrote: >> > >> >> Hi Colt, >> >> >> >> The issue is that if there's a crash between 2 and 3, then you still >> end >> >> up with inconsistent data in RocksDB. The only way to guarantee that >> your >> >> checkpoint offsets and locally stored data are consistent with each >> other >> >> are to atomically commit them, which can be achieved by having the >> offsets >> >> stored in RocksDB. >> >> >> >> The offsets column family is likely to be extremely small (one >> >> per-changelog partition + one per Topology input partition for regular >> >> stores, one per input partition for global stores). So the overhead >> will be >> >> minimal. >> >> >> >> A major benefit of doing this is that we can remove the explicit calls >> to >> >> db.flush(), which forcibly flushes memtables to disk on-commit. It >> turns >> >> out, RocksDB memtable flushes are largely dictated by Kafka Streams >> >> commits, *not* RocksDB configuration, which could be a major source of >> >> confusion. Atomic checkpointing makes it safe to remove these explicit >> >> flushes, because it no longer matters exactly when RocksDB flushes >> data to >> >> disk; since the data and corresponding checkpoint offsets will always >> be >> >> flushed together, the local store is always in a consistent state, and >> >> on-restart, it can always safely resume restoration from the on-disk >> >> offsets, restoring the small amount of data that hadn't been flushed >> when >> >> the app exited/crashed. >> >> >> >> Regards, >> >> Nick >> >> >> >> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io> >> wrote: >> >> >> >>> Nick, >> >>> >> >>> Thanks for your reply. Ack to A) and B). >> >>> >> >>> For item C), I see what you're referring to. Your proposed solution >> will >> >>> work, so no need to change it. What I was suggesting was that it >> might be >> >>> possible to achieve this with only one column family. So long as: >> >>> >> >>> - No uncommitted records (i.e. not committed to the changelog) are >> >>> *committed* to the state store, AND >> >>> - The Checkpoint offset (which refers to the changelog topic) is >> less >> >>> than or equal to the last written changelog offset in rocksdb >> >>> >> >>> I don't see the need to do the full restoration from scratch. My >> >>> understanding was that prior to 844/892, full restorations were >> required >> >>> because there could be uncommitted records written to RocksDB; >> however, >> >>> given your use of RocksDB transactions, that can be avoided with the >> >>> pattern of 1) commit Kafka transaction, 2) commit RocksDB >> transaction, 3) >> >>> update offset in checkpoint file. >> >>> >> >>> Anyways, your proposed solution works equivalently and I don't believe >> >>> there is much overhead to an additional column family in RocksDB. >> Perhaps >> >>> it may even perform better than making separate writes to the >> checkpoint >> >>> file. >> >>> >> >>> Colt McNealy >> >>> *Founder, LittleHorse.io* >> >>> >> >>> >> >>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <nick.telf...@gmail.com> >> >>> wrote: >> >>> >> >>>> Hi Colt, >> >>>> >> >>>> A. I've done my best to de-couple the StateStore stuff from the rest >> of >> >>> the >> >>>> Streams engine. The fact that there will be only one ongoing (write) >> >>>> transaction at a time is not guaranteed by any API, and is just a >> >>>> consequence of the way Streams operates. To that end, I tried to >> ensure >> >>> the >> >>>> documentation and guarantees provided by the new APIs are >> independent of >> >>>> this incidental behaviour. In practice, you're right, this >> essentially >> >>>> refers to "interactive queries", which are technically "read >> >>> transactions", >> >>>> even if they don't actually use the transaction API to isolate >> >>> themselves. >> >>>> >> >>>> B. Yes, although not ideal. This is for backwards compatibility, >> >>> because: >> >>>> 1) Existing custom StateStore implementations will implement >> >>> flush(), >> >>>> and not commit(), but the Streams engine now calls commit(), so those >> >>> calls >> >>>> need to be forwarded to flush() for these legacy stores. >> >>>> 2) Existing StateStore *users*, i.e. outside of the Streams >> engine >> >>>> itself, may depend on explicitly calling flush(), so for these cases, >> >>>> flush() needs to be redirected to call commit(). >> >>>> If anyone has a better way to guarantee compatibility without >> >>> introducing >> >>>> this potential recursion loop, I'm open to changes! >> >>>> >> >>>> C. This is described in the "Atomic Checkpointing" section. Offsets >> are >> >>>> stored in a separate RocksDB column family, which is guaranteed to be >> >>>> atomically flushed to disk with all other column families. The issue >> of >> >>>> checkpoints being written to disk after commit causing inconsistency >> if >> >>> it >> >>>> crashes in between is the reason why, under EOS, checkpoint files are >> >>> only >> >>>> written on clean shutdown. This is one of the major causes of "full >> >>>> restorations", so moving the offsets into a place where they can be >> >>>> guaranteed to be atomically written with the data they checkpoint >> >>> allows us >> >>>> to write the checkpoint offsets *on every commit*, not just on clean >> >>>> shutdown. >> >>>> >> >>>> Regards, >> >>>> Nick >> >>>> >> >>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io> >> wrote: >> >>>> >> >>>>> Nick, >> >>>>> >> >>>>> Thank you for continuing this work. I have a few minor clarifying >> >>>>> questions. >> >>>>> >> >>>>> A) "Records written to any transaction are visible to all other >> >>>>> transactions immediately." I am confused here—I thought there could >> >>> only >> >>>> be >> >>>>> one transaction going on at a time for a given state store given the >> >>>>> threading model for processing records on a Task. Do you mean >> >>> Interactive >> >>>>> Queries by "other transactions"? (If so, then everything makes >> sense—I >> >>>>> thought that since IQ were read-only then they didn't count as >> >>>>> transactions). >> >>>>> >> >>>>> B) Is it intentional that the default implementations of the flush() >> >>> and >> >>>>> commit() methods in the StateStore class refer to each other in some >> >>> sort >> >>>>> of unbounded recursion? >> >>>>> >> >>>>> C) How will the getCommittedOffset() method work? At first I thought >> >>> the >> >>>>> way to do it would be using a special key in the RocksDB store to >> >>> store >> >>>> the >> >>>>> offset, and committing that with the transaction. But upon second >> >>>> thought, >> >>>>> since restoration from the changelog is an idempotent procedure, I >> >>> think >> >>>> it >> >>>>> would be fine to 1) commit the RocksDB transaction and then 2) write >> >>> the >> >>>>> offset to disk in a checkpoint file. If there is a crash between 1) >> >>> and >> >>>> 2), >> >>>>> I think the only downside is now we replay a few more records (at a >> >>> cost >> >>>> of >> >>>>> <100ms). Am I missing something there? >> >>>>> >> >>>>> Other than that, everything makes sense to me. >> >>>>> >> >>>>> Cheers, >> >>>>> Colt McNealy >> >>>>> *Founder, LittleHorse.io* >> >>>>> >> >>>>> >> >>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < >> nick.telf...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>>> Hi everyone, >> >>>>>> >> >>>>>> I've updated the KIP to reflect the latest version of the design: >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> >>>>>> >> >>>>>> There are several changes in there that reflect feedback from this >> >>>>> thread, >> >>>>>> and there's a new section and a bunch of interface changes relating >> >>> to >> >>>>>> Atomic Checkpointing, which is the final piece of the puzzle to >> >>> making >> >>>>>> everything robust. >> >>>>>> >> >>>>>> Let me know what you think! >> >>>>>> >> >>>>>> Regards, >> >>>>>> Nick >> >>>>>> >> >>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford <nick.telf...@gmail.com> >> >>>>> wrote: >> >>>>>> >> >>>>>>> Hi Lucas, >> >>>>>>> >> >>>>>>> Thanks for looking over my KIP. >> >>>>>>> >> >>>>>>> A) The bound is per-instance, not per-Task. This was a typo in the >> >>>> KIP >> >>>>>>> that I've now corrected. It was originally per-Task, but I >> >>> changed it >> >>>>> to >> >>>>>>> per-instance for exactly the reason you highlighted. >> >>>>>>> B) It's worth noting that transactionality is only enabled under >> >>> EOS, >> >>>>> and >> >>>>>>> in the default mode of operation (ALOS), there should be no >> >>> change in >> >>>>>>> behavior at all. I think, under EOS, we can mitigate the impact on >> >>>>> users >> >>>>>> by >> >>>>>>> sufficiently low default values for the memory bound >> >>> configuration. I >> >>>>>>> understand your hesitation to include a significant change of >> >>>>> behaviour, >> >>>>>>> especially in a minor release, but I suspect that most users will >> >>>>> prefer >> >>>>>>> the memory impact (under EOS) to the existing behaviour of >> >>> frequent >> >>>>> state >> >>>>>>> restorations! If this is a problem, the changes can wait until the >> >>>> next >> >>>>>>> major release. I'll be running a patched version of streams in >> >>>>> production >> >>>>>>> with these changes as soon as they're ready, so it won't disrupt >> >>> me >> >>>> :-D >> >>>>>>> C) The main purpose of this sentence was just to note that some >> >>>> changes >> >>>>>>> will need to be made to the way Segments are handled in order to >> >>>> ensure >> >>>>>>> they also benefit from transactions. At the time I wrote it, I >> >>> hadn't >> >>>>>>> figured out the specific changes necessary, so it was deliberately >> >>>>> vague. >> >>>>>>> This is the one outstanding problem I'm currently working on, and >> >>>> I'll >> >>>>>>> update this section with more detail once I have figured out the >> >>>> exact >> >>>>>>> changes required. >> >>>>>>> D) newTransaction() provides the necessary isolation guarantees. >> >>>> While >> >>>>>>> the RocksDB implementation of transactions doesn't technically >> >>> *need* >> >>>>>>> read-only users to call newTransaction(), other implementations >> >>>> (e.g. a >> >>>>>>> hypothetical PostgresStore) may require it. Calling >> >>> newTransaction() >> >>>>> when >> >>>>>>> no transaction is necessary is essentially free, as it will just >> >>>> return >> >>>>>>> this. >> >>>>>>> >> >>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it >> >>> should >> >>>> be >> >>>>>>> fairly obvious where the performance problems stem from: writes >> >>> under >> >>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with the >> >>>>>>> tombstone/record flag, 1 to decode it from the tombstone/record >> >>> flag, >> >>>>>> and 1 >> >>>>>>> to copy the record from the "temporary" store to the "main" store, >> >>>> when >> >>>>>> the >> >>>>>>> transaction commits. The different approach taken by KIP-869 >> >>> should >> >>>>>> perform >> >>>>>>> much better, as it avoids all these copies, and may actually >> >>> perform >> >>>>>>> slightly better than trunk, due to batched writes in RocksDB >> >>>> performing >> >>>>>>> better than non-batched writes.[1] >> >>>>>>> >> >>>>>>> Regards, >> >>>>>>> Nick >> >>>>>>> >> >>>>>>> 1: >> >>>>>> >> >>>> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> >>>>>>> >> >>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < >> >>> lbruts...@confluent.io >> >>>>>> .invalid> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi Nick, >> >>>>>>>> >> >>>>>>>> I'm just starting to read up on the whole discussion about >> >>> KIP-892 >> >>>> and >> >>>>>>>> KIP-844. Thanks a lot for your work on this, I do think >> >>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have some >> >>>>>>>> questions about the latest draft. >> >>>>>>>> >> >>>>>>>> A) If I understand correctly, you propose to put a bound on the >> >>>>>>>> (native) memory consumed by each task. However, I wonder if this >> >>> is >> >>>>>>>> sufficient if we have temporary imbalances in the cluster. For >> >>>>>>>> example, depending on the timing of rebalances during a cluster >> >>>>>>>> restart, it could happen that a single streams node is assigned a >> >>>> lot >> >>>>>>>> more tasks than expected. With your proposed change, this would >> >>> mean >> >>>>>>>> that the memory required by this one node could be a multiple of >> >>>> what >> >>>>>>>> is required during normal operation. I wonder if it wouldn't be >> >>>> safer >> >>>>>>>> to put a global bound on the memory use, across all tasks. >> >>>>>>>> B) Generally, the memory concerns still give me the feeling >> that >> >>>> this >> >>>>>>>> should not be enabled by default for all users in a minor >> >>> release. >> >>>>>>>> C) In section "Transaction Management": the sentence "A similar >> >>>>>>>> analogue will be created to automatically manage `Segment` >> >>>>>>>> transactions.". Maybe this is just me lacking some background, >> >>> but I >> >>>>>>>> do not understand this, it would be great if you could clarify >> >>> what >> >>>>>>>> you mean here. >> >>>>>>>> D) Could you please clarify why IQ has to call >> newTransaction(), >> >>>> when >> >>>>>>>> it's read-only. >> >>>>>>>> >> >>>>>>>> And one last thing not strictly related to your KIP: if there is >> >>> an >> >>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x slower >> >>> (e.g. >> >>>>>>>> by providing a flame graph), that would be quite interesting. >> >>>>>>>> >> >>>>>>>> Cheers, >> >>>>>>>> Lucas >> >>>>>>>> >> >>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < >> >>>> nick.telf...@gmail.com> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi everyone, >> >>>>>>>>> >> >>>>>>>>> I've updated the KIP with a more detailed design, which >> >>> reflects >> >>>> the >> >>>>>>>>> implementation I've been working on: >> >>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> >>>>>>>>> >> >>>>>>>>> This new design should address the outstanding points already >> >>> made >> >>>>> in >> >>>>>>>> the >> >>>>>>>>> thread. >> >>>>>>>>> >> >>>>>>>>> Please let me know if there are areas that are unclear or need >> >>>> more >> >>>>>>>>> clarification. >> >>>>>>>>> >> >>>>>>>>> I have a (nearly) working implementation. I'm confident that >> >>> the >> >>>>>>>> remaining >> >>>>>>>>> work (making Segments behave) will not impact the documented >> >>>> design. >> >>>>>>>>> >> >>>>>>>>> Regards, >> >>>>>>>>> >> >>>>>>>>> Nick >> >>>>>>>>> >> >>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io >> >>>> >> >>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Nick, >> >>>>>>>>>> >> >>>>>>>>>> Thank you for the reply; that makes sense. I was hoping that, >> >>>>> since >> >>>>>>>> reading >> >>>>>>>>>> uncommitted records from IQ in EOS isn't part of the >> >>> documented >> >>>>> API, >> >>>>>>>> maybe >> >>>>>>>>>> you *wouldn't* have to wait for the next major release to >> >>> make >> >>>>> that >> >>>>>>>> change; >> >>>>>>>>>> but given that it would be considered a major change, I like >> >>>> your >> >>>>>>>> approach >> >>>>>>>>>> the best. >> >>>>>>>>>> >> >>>>>>>>>> Wishing you a speedy recovery and happy coding! >> >>>>>>>>>> >> >>>>>>>>>> Thanks, >> >>>>>>>>>> Colt McNealy >> >>>>>>>>>> *Founder, LittleHorse.io* >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < >> >>>>>> nick.telf...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Colt, >> >>>>>>>>>>> >> >>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended to >> >>> try >> >>>> to >> >>>>>>>> keep the >> >>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd >> >>> have >> >>>> to >> >>>>>>>> wait for >> >>>>>>>>>> a >> >>>>>>>>>>> major version release to land these changes. >> >>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of >> >>> guarantee, >> >>>>> and >> >>>>>>>> the >> >>>>>>>>>>> typically longer commit intervals would be problematic when >> >>>>>> reading >> >>>>>>>> only >> >>>>>>>>>>> "committed" records. >> >>>>>>>>>>> >> >>>>>>>>>>> I've been away for 5 days recovering from minor surgery, >> >>> but I >> >>>>>>>> spent a >> >>>>>>>>>>> considerable amount of that time working through ideas for >> >>>>>> possible >> >>>>>>>>>>> solutions in my head. I think your suggestion of keeping >> >>> ALOS >> >>>>>>>> as-is, but >> >>>>>>>>>>> buffering writes for EOS is the right path forwards, >> >>> although >> >>>> I >> >>>>>>>> have a >> >>>>>>>>>>> solution that both expands on this, and provides for some >> >>> more >> >>>>>>>> formal >> >>>>>>>>>>> guarantees. >> >>>>>>>>>>> >> >>>>>>>>>>> Essentially, adding support to KeyValueStores for >> >>>>> "Transactions", >> >>>>>>>> with >> >>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed" >> >>> when >> >>>>> under >> >>>>>>>> EOS, >> >>>>>>>>>> and >> >>>>>>>>>>> "Read Uncommitted" under ALOS. >> >>>>>>>>>>> >> >>>>>>>>>>> The nice thing about this approach is that it gives us much >> >>>> more >> >>>>>>>> clearly >> >>>>>>>>>>> defined isolation behaviour that can be properly >> >>> documented to >> >>>>>>>> ensure >> >>>>>>>>>> users >> >>>>>>>>>>> know what to expect. >> >>>>>>>>>>> >> >>>>>>>>>>> I'm still working out the kinks in the design, and will >> >>> update >> >>>>> the >> >>>>>>>> KIP >> >>>>>>>>>> when >> >>>>>>>>>>> I have something. The main struggle is trying to implement >> >>>> this >> >>>>>>>> without >> >>>>>>>>>>> making any major changes to the existing interfaces or >> >>>> breaking >> >>>>>>>> existing >> >>>>>>>>>>> implementations, because currently everything expects to >> >>>> operate >> >>>>>>>> directly >> >>>>>>>>>>> on a StateStore, and not a Transaction of that store. I >> >>> think >> >>>>> I'm >> >>>>>>>> getting >> >>>>>>>>>>> close, although sadly I won't be able to progress much >> >>> until >> >>>>> next >> >>>>>>>> week >> >>>>>>>>>> due >> >>>>>>>>>>> to some work commitments. >> >>>>>>>>>>> >> >>>>>>>>>>> Regards, >> >>>>>>>>>>> Nick >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < >> >>>> c...@littlehorse.io> >> >>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Nick, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thank you for the explanation, and also for the updated >> >>>> KIP. I >> >>>>>> am >> >>>>>>>> quite >> >>>>>>>>>>>> eager for this improvement to be released as it would >> >>>> greatly >> >>>>>>>> reduce >> >>>>>>>>>> the >> >>>>>>>>>>>> operational difficulties of EOS streams apps. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Two questions: >> >>>>>>>>>>>> >> >>>>>>>>>>>> 10) >> >>>>>>>>>>>>> When reading records, we will use the >> >>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB >> >>>>>>>>>>>> and WriteBatchWithIndex#newIteratorWithBase utilities in >> >>>>> order >> >>>>>> to >> >>>>>>>>>> ensure >> >>>>>>>>>>>> that uncommitted writes are available to query. >> >>>>>>>>>>>> Why do extra work to enable the reading of uncommitted >> >>>> writes >> >>>>>>>> during >> >>>>>>>>>> IQ? >> >>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, in >> >>> my >> >>>>>>>> opinion, a >> >>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the >> >>>>>> guarantee >> >>>>>>>> that, >> >>>>>>>>>>>> with EOS, IQ only reads committed records. In order to >> >>> avoid >> >>>>>> dirty >> >>>>>>>>>> reads, >> >>>>>>>>>>>> one currently must query a standby replica (but this >> >>> still >> >>>>>> doesn't >> >>>>>>>>>> fully >> >>>>>>>>>>>> guarantee monotonic reads). >> >>>>>>>>>>>> >> >>>>>>>>>>>> 20) Is it also necessary to enable this optimization on >> >>> ALOS >> >>>>>>>> stores? >> >>>>>>>>>> The >> >>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to >> >>>> restore >> >>>>>>>> state >> >>>>>>>>>> from >> >>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was >> >>>> acceptable >> >>>>> to >> >>>>>>>> accept >> >>>>>>>>>>>> that there may have been uncommitted writes on disk. On a >> >>>> side >> >>>>>>>> note, if >> >>>>>>>>>>> you >> >>>>>>>>>>>> enable this type of store on ALOS processors, the >> >>> community >> >>>>>> would >> >>>>>>>>>>>> definitely want to enable queries on dirty reads; >> >>> otherwise >> >>>>>> users >> >>>>>>>> would >> >>>>>>>>>>>> have to wait 30 seconds (default) to see an update. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thank you for doing this fantastic work! >> >>>>>>>>>>>> Colt McNealy >> >>>>>>>>>>>> *Founder, LittleHorse.io* >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < >> >>>>>>>> nick.telf...@gmail.com> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no >> >>>> longer >> >>>>>>>> include >> >>>>>>>>>> the >> >>>>>>>>>>>>> StateStore management of checkpointing. This can be >> >>> added >> >>>>> as a >> >>>>>>>> KIP >> >>>>>>>>>>> later >> >>>>>>>>>>>> on >> >>>>>>>>>>>>> to further optimize the consistency and performance of >> >>>> state >> >>>>>>>> stores. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I've also added a section discussing some of the >> >>> concerns >> >>>>>> around >> >>>>>>>>>>>>> concurrency, especially in the presence of Iterators. >> >>> I'm >> >>>>>>>> thinking of >> >>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting >> >>>>>>>> copy-on-write >> >>>>>>>>>>>>> implementation (that only makes a copy if there's an >> >>>> active >> >>>>>>>>>> iterator), >> >>>>>>>>>>>> but >> >>>>>>>>>>>>> I'm open to suggestions. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>> Nick >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford < >> >>>>>>>> nick.telf...@gmail.com> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> Hi Colt, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I didn't do any profiling, but the 844 >> >>> implementation: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> - Writes uncommitted records to a temporary >> >>> RocksDB >> >>>>>>>> instance >> >>>>>>>>>>>>>> - Since tombstones need to be flagged, all >> >>> record >> >>>>>>>> values are >> >>>>>>>>>>>>>> prefixed with a value/tombstone marker. This >> >>>>>>>> necessitates a >> >>>>>>>>>>>> memory >> >>>>>>>>>>>>> copy. >> >>>>>>>>>>>>>> - On-commit, iterates all records in this >> >>> temporary >> >>>>>>>> instance and >> >>>>>>>>>>>>>> writes them to the main RocksDB store. >> >>>>>>>>>>>>>> - While iterating, the value/tombstone marker >> >>> needs >> >>>> to >> >>>>> be >> >>>>>>>> parsed >> >>>>>>>>>>> and >> >>>>>>>>>>>>>> the real value extracted. This necessitates >> >>> another >> >>>>>> memory >> >>>>>>>> copy. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> My guess is that the cost of iterating the temporary >> >>>>> RocksDB >> >>>>>>>> store >> >>>>>>>>>> is >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>> major factor, with the 2 extra memory copies >> >>> per-Record >> >>>>>>>>>> contributing >> >>>>>>>>>>> a >> >>>>>>>>>>>>>> significant amount too. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>> Nick >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy < >> >>>>>>>> c...@littlehorse.io> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hi all, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Out of curiosity, why does the performance of the >> >>> store >> >>>>>>>> degrade so >> >>>>>>>>>>>>>>> significantly with the 844 implementation? I >> >>> wouldn't >> >>>> be >> >>>>>> too >> >>>>>>>>>>> surprised >> >>>>>>>>>>>>> by >> >>>>>>>>>>>>>>> a >> >>>>>>>>>>>>>>> 50-60% drop (caused by each record being written >> >>>> twice), >> >>>>>> but >> >>>>>>>> 96% >> >>>>>>>>>> is >> >>>>>>>>>>>>>>> extreme. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> The only thing I can think of which could create >> >>> such a >> >>>>>>>> bottleneck >> >>>>>>>>>>>> would >> >>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and >> >>>> then >> >>>>>>>>>>>> re-serializes >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>> store values when copying from the uncommitted to >> >>>>> committed >> >>>>>>>> store, >> >>>>>>>>>>>> but I >> >>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the >> >>> PR. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Colt McNealy >> >>>>>>>>>>>>>>> *Founder, LittleHorse.io* >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford < >> >>>>>>>>>>> nick.telf...@gmail.com> >> >>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points >> >>> that >> >>>>> have >> >>>>>>>> been >> >>>>>>>>>>> raised >> >>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit >> >>>>> interval >> >>>>>>>> of 5 >> >>>>>>>>>>>> minutes >> >>>>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow >> >>>> too >> >>>>>>>> large. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> There's a couple of different things I can think >> >>> of >> >>>> to >> >>>>>>>> solve >> >>>>>>>>>> this: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> - We already have a memory/record limit in the >> >>> KIP >> >>>>> to >> >>>>>>>> prevent >> >>>>>>>>>>> OOM >> >>>>>>>>>>>>>>>> errors. Should we choose a default value for >> >>>> these? >> >>>>> My >> >>>>>>>>>> concern >> >>>>>>>>>>>> here >> >>>>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>> anything we choose might seem rather >> >>> arbitrary. We >> >>>>>> could >> >>>>>>>>>> change >> >>>>>>>>>>>>>>>> its behaviour such that under ALOS, it only >> >>>> triggers >> >>>>>> the >> >>>>>>>>>> commit >> >>>>>>>>>>>> of >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> StateStore, but under EOS, it triggers a >> >>> commit of >> >>>>> the >> >>>>>>>> Kafka >> >>>>>>>>>>>>>>>> transaction. >> >>>>>>>>>>>>>>>> - We could introduce a separate ` >> >>>>>> checkpoint.interval.ms` >> >>>>>>>> to >> >>>>>>>>>>>> allow >> >>>>>>>>>>>>>>> ALOS >> >>>>>>>>>>>>>>>> to commit the StateStores more frequently than >> >>> the >> >>>>>>>> general >> >>>>>>>>>>>>>>>> commit.interval.ms? My concern here is that >> >>> the >> >>>>>>>> semantics of >> >>>>>>>>>>>> this >> >>>>>>>>>>>>>>>> config >> >>>>>>>>>>>>>>>> would depend on the processing.mode; under >> >>> ALOS it >> >>>>>> would >> >>>>>>>>>> allow >> >>>>>>>>>>>> more >> >>>>>>>>>>>>>>>> frequently committing stores, whereas under >> >>> EOS it >> >>>>>>>> couldn't. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Any better ideas? >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 16:25, Nick Telford < >> >>>>>>>>>>> nick.telf...@gmail.com> >> >>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Hi Alex, >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Thanks for the feedback. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I've updated the discussion of OOM issues by >> >>>>> describing >> >>>>>>>> how >> >>>>>>>>>>> we'll >> >>>>>>>>>>>>>>> handle >> >>>>>>>>>>>>>>>>> it. Here's the new text: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> To mitigate this, we will automatically force a >> >>>> Task >> >>>>>>>> commit if >> >>>>>>>>>>> the >> >>>>>>>>>>>>>>> total >> >>>>>>>>>>>>>>>>>> uncommitted records returned by >> >>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedEntries() >> >>>>>> exceeds a >> >>>>>>>>>>>> threshold, >> >>>>>>>>>>>>>>>>>> configured by >> >>>>> max.uncommitted.state.entries.per.task; >> >>>>>>>> or the >> >>>>>>>>>>>> total >> >>>>>>>>>>>>>>>>>> memory used for buffering uncommitted records >> >>>>> returned >> >>>>>>>> by >> >>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() >> >>>> exceeds >> >>>>>> the >> >>>>>>>>>>> threshold >> >>>>>>>>>>>>>>>>>> configured by >> >>>> max.uncommitted.state.bytes.per.task. >> >>>>>>>> This will >> >>>>>>>>>>>>> roughly >> >>>>>>>>>>>>>>>>>> bound the memory required per-Task for >> >>> buffering >> >>>>>>>> uncommitted >> >>>>>>>>>>>>> records, >> >>>>>>>>>>>>>>>>>> irrespective of the commit.interval.ms, and >> >>> will >> >>>>>>>> effectively >> >>>>>>>>>>>> bound >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> number of records that will need to be >> >>> restored in >> >>>>> the >> >>>>>>>> event >> >>>>>>>>>>> of a >> >>>>>>>>>>>>>>>> failure. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> These limits will be checked in >> >>> StreamTask#process >> >>>>> and >> >>>>>> a >> >>>>>>>>>>> premature >> >>>>>>>>>>>>>>> commit >> >>>>>>>>>>>>>>>>>> will be requested via Task#requestCommit(). >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Note that these new methods provide default >> >>>>>>>> implementations >> >>>>>>>>>> that >> >>>>>>>>>>>>>>> ensure >> >>>>>>>>>>>>>>>>>> existing custom stores and non-transactional >> >>>> stores >> >>>>>>>> (e.g. >> >>>>>>>>>>>>>>>>>> InMemoryKeyValueStore) do not force any early >> >>>>> commits. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I've chosen to have the StateStore expose >> >>>>>> approximations >> >>>>>>>> of >> >>>>>>>>>> its >> >>>>>>>>>>>>> buffer >> >>>>>>>>>>>>>>>>> size/count instead of opaquely requesting a >> >>> commit >> >>>> in >> >>>>>>>> order to >> >>>>>>>>>>>>>>> delegate >> >>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> decision making to the Task itself. This enables >> >>>>> Tasks >> >>>>>>>> to look >> >>>>>>>>>>> at >> >>>>>>>>>>>>>>> *all* >> >>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>> their StateStores, and determine whether an >> >>> early >> >>>>>> commit >> >>>>>>>> is >> >>>>>>>>>>>>> necessary. >> >>>>>>>>>>>>>>>>> Notably, it enables pre-Task thresholds, >> >>> instead of >> >>>>>>>> per-Store, >> >>>>>>>>>>>> which >> >>>>>>>>>>>>>>>>> prevents Tasks with many StateStores from using >> >>>> much >> >>>>>> more >> >>>>>>>>>> memory >> >>>>>>>>>>>>> than >> >>>>>>>>>>>>>>>> Tasks >> >>>>>>>>>>>>>>>>> with one StateStore. This makes sense, since >> >>>> commits >> >>>>>> are >> >>>>>>>> done >> >>>>>>>>>>>>> by-Task, >> >>>>>>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>> by-Store. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Prizes* for anyone who can come up with a better >> >>>> name >> >>>>>>>> for the >> >>>>>>>>>>> new >> >>>>>>>>>>>>>>> config >> >>>>>>>>>>>>>>>>> properties! >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Thanks for pointing out the potential >> >>> performance >> >>>>>> issues >> >>>>>>>> of >> >>>>>>>>>>> WBWI. >> >>>>>>>>>>>>> From >> >>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> benchmarks that user posted[1], it looks like >> >>> WBWI >> >>>>>> still >> >>>>>>>>>>> performs >> >>>>>>>>>>>>>>>>> considerably better than individual puts, which >> >>> is >> >>>>> the >> >>>>>>>>>> existing >> >>>>>>>>>>>>>>> design, >> >>>>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>>> I'd actually expect a performance boost from >> >>> WBWI, >> >>>>> just >> >>>>>>>> not as >> >>>>>>>>>>>> great >> >>>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>>>> we'd get from a plain WriteBatch. This does >> >>> suggest >> >>>>>> that >> >>>>>>>> a >> >>>>>>>>>> good >> >>>>>>>>>>>>>>>>> optimization would be to use a regular >> >>> WriteBatch >> >>>> for >> >>>>>>>>>>> restoration >> >>>>>>>>>>>>> (in >> >>>>>>>>>>>>>>>>> RocksDBStore#restoreBatch), since we know that >> >>>> those >> >>>>>>>> records >> >>>>>>>>>>> will >> >>>>>>>>>>>>>>> never >> >>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>> queried before they're committed. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> 1: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>> >> >>>>> >> >>> >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> * Just kidding, no prizes, sadly. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 12:28, Alexander >> >>> Sorokoumov >> >>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Hey Nick, >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Thank you for the KIP! With such a significant >> >>>>>>>> performance >> >>>>>>>>>>>>>>> degradation >> >>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>> the secondary store approach, we should >> >>> definitely >> >>>>>>>> consider >> >>>>>>>>>>>>>>>>>> WriteBatchWithIndex. I also like encapsulating >> >>>>>>>> checkpointing >> >>>>>>>>>>>> inside >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> default state store implementation to improve >> >>>>>>>> performance. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> +1 to John's comment to keep the current >> >>>>> checkpointing >> >>>>>>>> as a >> >>>>>>>>>>>>> fallback >> >>>>>>>>>>>>>>>>>> mechanism. We want to keep existing users' >> >>>> workflows >> >>>>>>>> intact >> >>>>>>>>>> if >> >>>>>>>>>>> we >> >>>>>>>>>>>>>>> can. A >> >>>>>>>>>>>>>>>>>> non-intrusive way would be to add a separate >> >>>>>> StateStore >> >>>>>>>>>> method, >> >>>>>>>>>>>>> say, >> >>>>>>>>>>>>>>>>>> StateStore#managesCheckpointing(), that >> >>> controls >> >>>>>>>> whether the >> >>>>>>>>>>>> state >> >>>>>>>>>>>>>>> store >> >>>>>>>>>>>>>>>>>> implementation owns checkpointing. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> I think that a solution to the transactional >> >>>> writes >> >>>>>>>> should >> >>>>>>>>>>>> address >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> OOMEs. One possible way to address that is to >> >>> wire >> >>>>>>>>>> StateStore's >> >>>>>>>>>>>>>>> commit >> >>>>>>>>>>>>>>>>>> request by adding, say, StateStore#commitNeeded >> >>>> that >> >>>>>> is >> >>>>>>>>>> checked >> >>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>> StreamTask#commitNeeded via the corresponding >> >>>>>>>>>>>>> ProcessorStateManager. >> >>>>>>>>>>>>>>>> With >> >>>>>>>>>>>>>>>>>> that change, RocksDBStore will have to track >> >>> the >> >>>>>> current >> >>>>>>>>>>>>> transaction >> >>>>>>>>>>>>>>>> size >> >>>>>>>>>>>>>>>>>> and request a commit when the size goes over a >> >>>>>>>> (configurable) >> >>>>>>>>>>>>>>> threshold. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> AFAIU WriteBatchWithIndex might perform >> >>>>> significantly >> >>>>>>>> slower >> >>>>>>>>>>> than >> >>>>>>>>>>>>>>>> non-txn >> >>>>>>>>>>>>>>>>>> puts as the batch size grows [1]. We should >> >>> have a >> >>>>>>>>>>> configuration >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>>> fall >> >>>>>>>>>>>>>>>>>> back to the current behavior (and/or disable >> >>> txn >> >>>>>> stores >> >>>>>>>> for >> >>>>>>>>>>> ALOS) >> >>>>>>>>>>>>>>> unless >> >>>>>>>>>>>>>>>>>> the benchmarks show negligible overhead for >> >>> longer >> >>>>>>>> commits / >> >>>>>>>>>>>>>>>> large-enough >> >>>>>>>>>>>>>>>>>> batch sizes. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> If you prefer to keep the KIP smaller, I would >> >>>>> rather >> >>>>>>>> cut out >> >>>>>>>>>>>>>>>>>> state-store-managed checkpointing rather than >> >>>> proper >> >>>>>>>> OOMe >> >>>>>>>>>>>> handling >> >>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>> being able to switch to non-txn behavior. The >> >>>>>>>> checkpointing >> >>>>>>>>>> is >> >>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>>> necessary to solve the recovery-under-EOS >> >>> problem. >> >>>>> On >> >>>>>>>> the >> >>>>>>>>>> other >> >>>>>>>>>>>>> hand, >> >>>>>>>>>>>>>>>> once >> >>>>>>>>>>>>>>>>>> WriteBatchWithIndex is in, it will be much >> >>> easier >> >>>> to >> >>>>>> add >> >>>>>>>>>>>>>>>>>> state-store-managed checkpointing. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> If you share the current implementation, I am >> >>>> happy >> >>>>> to >> >>>>>>>> help >> >>>>>>>>>> you >> >>>>>>>>>>>>>>> address >> >>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> OOMe and configuration parts as well as review >> >>> and >> >>>>>> test >> >>>>>>>> the >> >>>>>>>>>>>> patch. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Best, >> >>>>>>>>>>>>>>>>>> Alex >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 1. >> >>> https://github.com/facebook/rocksdb/issues/608 >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford < >> >>>>>>>>>>>>> nick.telf...@gmail.com >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> Hi John, >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> Thanks for the review and feedback! >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> 1. Custom Stores: I've been mulling over this >> >>>>>> problem >> >>>>>>>>>> myself. >> >>>>>>>>>>>> As >> >>>>>>>>>>>>> it >> >>>>>>>>>>>>>>>>>> stands, >> >>>>>>>>>>>>>>>>>>> custom stores would essentially lose >> >>>> checkpointing >> >>>>>>>> with no >> >>>>>>>>>>>>>>> indication >> >>>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>> they're expected to make changes, besides a >> >>> line >> >>>>> in >> >>>>>>>> the >> >>>>>>>>>>> release >> >>>>>>>>>>>>>>>> notes. I >> >>>>>>>>>>>>>>>>>>> agree that the best solution would be to >> >>>> provide a >> >>>>>>>> default >> >>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>> checkpoints >> >>>>>>>>>>>>>>>>>>> to a file. The one thing I would change is >> >>> that >> >>>>> the >> >>>>>>>>>>>> checkpointing >> >>>>>>>>>>>>>>> is >> >>>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>> a >> >>>>>>>>>>>>>>>>>>> store-local file, instead of a per-Task file. >> >>>> This >> >>>>>>>> way the >> >>>>>>>>>>>>>>> StateStore >> >>>>>>>>>>>>>>>>>> still >> >>>>>>>>>>>>>>>>>>> technically owns its own checkpointing (via a >> >>>>>> default >> >>>>>>>>>>>>>>> implementation), >> >>>>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>> the StateManager/Task execution engine >> >>> doesn't >> >>>>> need >> >>>>>>>> to know >> >>>>>>>>>>>>>>> anything >> >>>>>>>>>>>>>>>>>> about >> >>>>>>>>>>>>>>>>>>> checkpointing, which greatly simplifies some >> >>> of >> >>>>> the >> >>>>>>>> logic. >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> 2. OOME errors: The main reasons why I didn't >> >>>>>> explore >> >>>>>>>> a >> >>>>>>>>>>>> solution >> >>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>> this is >> >>>>>>>>>>>>>>>>>>> a) to keep this KIP as simple as possible, >> >>> and >> >>>> b) >> >>>>>>>> because >> >>>>>>>>>> I'm >> >>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>>> exactly >> >>>>>>>>>>>>>>>>>>> how to signal that a Task should commit >> >>>>> prematurely. >> >>>>>>>> I'm >> >>>>>>>>>>>>> confident >> >>>>>>>>>>>>>>>> it's >> >>>>>>>>>>>>>>>>>>> possible, and I think it's worth adding a >> >>>> section >> >>>>> on >> >>>>>>>>>> handling >> >>>>>>>>>>>>> this. >> >>>>>>>>>>>>>>>>>> Besides >> >>>>>>>>>>>>>>>>>>> my proposal to force an early commit once >> >>> memory >> >>>>>> usage >> >>>>>>>>>>> reaches >> >>>>>>>>>>>> a >> >>>>>>>>>>>>>>>>>> threshold, >> >>>>>>>>>>>>>>>>>>> is there any other approach that you might >> >>>> suggest >> >>>>>> for >> >>>>>>>>>>> tackling >> >>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>> problem? >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> 3. ALOS: I can add in an explicit paragraph, >> >>> but >> >>>>> my >> >>>>>>>>>>> assumption >> >>>>>>>>>>>> is >> >>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>> since transactional behaviour comes at >> >>> little/no >> >>>>>>>> cost, that >> >>>>>>>>>>> it >> >>>>>>>>>>>>>>> should >> >>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>>>> available by default on all stores, >> >>> irrespective >> >>>>> of >> >>>>>>>> the >> >>>>>>>>>>>>> processing >> >>>>>>>>>>>>>>>> mode. >> >>>>>>>>>>>>>>>>>>> While ALOS doesn't use transactions, the Task >> >>>>> itself >> >>>>>>>> still >> >>>>>>>>>>>>>>> "commits", >> >>>>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>>>>> the behaviour should be correct under ALOS >> >>> too. >> >>>>> I'm >> >>>>>>>> not >> >>>>>>>>>>>> convinced >> >>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>> it's >> >>>>>>>>>>>>>>>>>>> worth having both >> >>>> transactional/non-transactional >> >>>>>>>> stores >> >>>>>>>>>>>>>>> available, as >> >>>>>>>>>>>>>>>>>> it >> >>>>>>>>>>>>>>>>>>> would considerably increase the complexity of >> >>>> the >> >>>>>>>> codebase, >> >>>>>>>>>>> for >> >>>>>>>>>>>>>>> very >> >>>>>>>>>>>>>>>>>> little >> >>>>>>>>>>>>>>>>>>> benefit. >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> 4. Method deprecation: Are you referring to >> >>>>>>>>>>>>>>> StateStore#getPosition()? >> >>>>>>>>>>>>>>>>>> As I >> >>>>>>>>>>>>>>>>>>> understand it, Position contains the >> >>> position of >> >>>>> the >> >>>>>>>>>> *source* >> >>>>>>>>>>>>>>> topics, >> >>>>>>>>>>>>>>>>>>> whereas the commit offsets would be the >> >>>>> *changelog* >> >>>>>>>>>> offsets. >> >>>>>>>>>>> So >> >>>>>>>>>>>>>>> it's >> >>>>>>>>>>>>>>>>>> still >> >>>>>>>>>>>>>>>>>>> necessary to retain the Position data, as >> >>> well >> >>>> as >> >>>>>> the >> >>>>>>>>>>> changelog >> >>>>>>>>>>>>>>>> offsets. >> >>>>>>>>>>>>>>>>>>> What I meant in the KIP is that Position >> >>> offsets >> >>>>> are >> >>>>>>>>>>> currently >> >>>>>>>>>>>>>>> stored >> >>>>>>>>>>>>>>>>>> in a >> >>>>>>>>>>>>>>>>>>> file, and since we can atomically store >> >>> metadata >> >>>>>>>> along with >> >>>>>>>>>>> the >> >>>>>>>>>>>>>>> record >> >>>>>>>>>>>>>>>>>>> batch we commit to RocksDB, we can move our >> >>>>> Position >> >>>>>>>>>> offsets >> >>>>>>>>>>> in >> >>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>> metadata too, and gain the same transactional >> >>>>>>>> guarantees >> >>>>>>>>>> that >> >>>>>>>>>>>> we >> >>>>>>>>>>>>>>> will >> >>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>> changelog offsets, ensuring that the Position >> >>>>>> offsets >> >>>>>>>> are >> >>>>>>>>>>>>>>> consistent >> >>>>>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>>>> the records that are read from the database. >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>>>>>>> Nick >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> On Tue, 22 Nov 2022 at 16:25, John Roesler < >> >>>>>>>>>>>> vvcep...@apache.org> >> >>>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Thanks for publishing this alternative, >> >>> Nick! >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> The benchmark you mentioned in the KIP-844 >> >>>>>>>> discussion >> >>>>>>>>>> seems >> >>>>>>>>>>>>> like >> >>>>>>>>>>>>>>> a >> >>>>>>>>>>>>>>>>>>>> compelling reason to revisit the built-in >> >>>>>>>>>> transactionality >> >>>>>>>>>>>>>>>> mechanism. >> >>>>>>>>>>>>>>>>>> I >> >>>>>>>>>>>>>>>>>>>> also appreciate you analysis, showing that >> >>> for >> >>>>>> most >> >>>>>>>> use >> >>>>>>>>>>>> cases, >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> write >> >>>>>>>>>>>>>>>>>>>> batch approach should be just fine. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> There are a couple of points that would >> >>> hold >> >>>> me >> >>>>>>>> back from >> >>>>>>>>>>>>>>> approving >> >>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>> KIP right now: >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> 1. Loss of coverage for custom stores. >> >>>>>>>>>>>>>>>>>>>> The fact that you can plug in a >> >>> (relatively) >> >>>>>> simple >> >>>>>>>>>>>>>>> implementation >> >>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> XStateStore interfaces and automagically >> >>> get a >> >>>>>>>>>> distributed >> >>>>>>>>>>>>>>> database >> >>>>>>>>>>>>>>>>>> out >> >>>>>>>>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>> it is a significant benefit of Kafka >> >>> Streams. >> >>>>> I'd >> >>>>>>>> hate to >> >>>>>>>>>>>> lose >> >>>>>>>>>>>>>>> it, >> >>>>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>>>> it >> >>>>>>>>>>>>>>>>>>>> would be better to spend some time and >> >>> come up >> >>>>>> with >> >>>>>>>> a way >> >>>>>>>>>>> to >> >>>>>>>>>>>>>>>> preserve >> >>>>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>> property. For example, can we provide a >> >>>> default >> >>>>>>>>>>>> implementation >> >>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>>>>>> `commit(..)` that re-implements the >> >>> existing >> >>>>>>>>>>> checkpoint-file >> >>>>>>>>>>>>>>>>>> approach? Or >> >>>>>>>>>>>>>>>>>>>> perhaps add an `isTransactional()` flag to >> >>> the >> >>>>>> state >> >>>>>>>>>> store >> >>>>>>>>>>>>>>> interface >> >>>>>>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>>>>>> that the runtime can decide whether to >> >>>> continue >> >>>>> to >> >>>>>>>> manage >> >>>>>>>>>>>>>>> checkpoint >> >>>>>>>>>>>>>>>>>>> files >> >>>>>>>>>>>>>>>>>>>> vs delegating transactionality to the >> >>> stores? >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> 2. Guarding against OOME >> >>>>>>>>>>>>>>>>>>>> I appreciate your analysis, but I don't >> >>> think >> >>>>> it's >> >>>>>>>>>>> sufficient >> >>>>>>>>>>>>> to >> >>>>>>>>>>>>>>> say >> >>>>>>>>>>>>>>>>>> that >> >>>>>>>>>>>>>>>>>>>> we will solve the memory problem later if >> >>> it >> >>>>>> becomes >> >>>>>>>>>>>> necessary. >> >>>>>>>>>>>>>>> The >> >>>>>>>>>>>>>>>>>>>> experience leading to that situation would >> >>> be >> >>>>>> quite >> >>>>>>>> bad: >> >>>>>>>>>>>>> Imagine, >> >>>>>>>>>>>>>>>> you >> >>>>>>>>>>>>>>>>>>>> upgrade to AK 3.next, your tests pass, so >> >>> you >> >>>>>>>> deploy to >> >>>>>>>>>>>>>>> production. >> >>>>>>>>>>>>>>>>>> That >> >>>>>>>>>>>>>>>>>>>> night, you get paged because your app is >> >>> now >> >>>>>>>> crashing >> >>>>>>>>>> with >> >>>>>>>>>>>>>>> OOMEs. As >> >>>>>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>>>>> all OOMEs, you'll have a really hard time >> >>>>> finding >> >>>>>>>> the >> >>>>>>>>>> root >> >>>>>>>>>>>>> cause, >> >>>>>>>>>>>>>>>> and >> >>>>>>>>>>>>>>>>>>> once >> >>>>>>>>>>>>>>>>>>>> you do, you won't have a clear path to >> >>> resolve >> >>>>> the >> >>>>>>>> issue. >> >>>>>>>>>>> You >> >>>>>>>>>>>>>>> could >> >>>>>>>>>>>>>>>>>> only >> >>>>>>>>>>>>>>>>>>>> tune down the commit interval and cache >> >>> buffer >> >>>>>> size >> >>>>>>>> until >> >>>>>>>>>>> you >> >>>>>>>>>>>>>>> stop >> >>>>>>>>>>>>>>>>>>> getting >> >>>>>>>>>>>>>>>>>>>> crashes. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> FYI, I know of multiple cases where people >> >>> run >> >>>>> EOS >> >>>>>>>> with >> >>>>>>>>>>> much >> >>>>>>>>>>>>>>> larger >> >>>>>>>>>>>>>>>>>>> commit >> >>>>>>>>>>>>>>>>>>>> intervals to get better batching than the >> >>>>> default, >> >>>>>>>> so I >> >>>>>>>>>>> don't >> >>>>>>>>>>>>>>> think >> >>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>> pathological case would be as rare as you >> >>>>> suspect. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Given that we already have the rudiments >> >>> of an >> >>>>>> idea >> >>>>>>>> of >> >>>>>>>>>> what >> >>>>>>>>>>>> we >> >>>>>>>>>>>>>>> could >> >>>>>>>>>>>>>>>>>> do >> >>>>>>>>>>>>>>>>>>> to >> >>>>>>>>>>>>>>>>>>>> prevent this downside, we should take the >> >>> time >> >>>>> to >> >>>>>>>> design >> >>>>>>>>>> a >> >>>>>>>>>>>>>>> solution. >> >>>>>>>>>>>>>>>>>> We >> >>>>>>>>>>>>>>>>>>> owe >> >>>>>>>>>>>>>>>>>>>> it to our users to ensure that awesome new >> >>>>>> features >> >>>>>>>> don't >> >>>>>>>>>>>> come >> >>>>>>>>>>>>>>> with >> >>>>>>>>>>>>>>>>>>> bitter >> >>>>>>>>>>>>>>>>>>>> pills unless we can't avoid it. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> 3. ALOS mode. >> >>>>>>>>>>>>>>>>>>>> On the other hand, I didn't see an >> >>> indication >> >>>> of >> >>>>>> how >> >>>>>>>>>> stores >> >>>>>>>>>>>>> will >> >>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>>>>> handled under ALOS (aka non-EOS) mode. >> >>>>>>>> Theoretically, the >> >>>>>>>>>>>>>>>>>>> transactionality >> >>>>>>>>>>>>>>>>>>>> of the store and the processing mode are >> >>>>>>>> orthogonal. A >> >>>>>>>>>>>>>>> transactional >> >>>>>>>>>>>>>>>>>>> store >> >>>>>>>>>>>>>>>>>>>> would serve ALOS just as well as a >> >>>>>>>> non-transactional one >> >>>>>>>>>>> (if >> >>>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>>> better). >> >>>>>>>>>>>>>>>>>>>> Under ALOS, though, the default commit >> >>>> interval >> >>>>> is >> >>>>>>>> five >> >>>>>>>>>>>>> minutes, >> >>>>>>>>>>>>>>> so >> >>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> memory issue is far more pressing. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> As I see it, we have several options to >> >>>> resolve >> >>>>>> this >> >>>>>>>>>> point. >> >>>>>>>>>>>> We >> >>>>>>>>>>>>>>> could >> >>>>>>>>>>>>>>>>>>>> demonstrate that transactional stores work >> >>>> just >> >>>>>>>> fine for >> >>>>>>>>>>> ALOS >> >>>>>>>>>>>>>>> and we >> >>>>>>>>>>>>>>>>>> can >> >>>>>>>>>>>>>>>>>>>> therefore just swap over unconditionally. >> >>> We >> >>>>> could >> >>>>>>>> also >> >>>>>>>>>>>> disable >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> transactional mechanism under ALOS so that >> >>>>> stores >> >>>>>>>> operate >> >>>>>>>>>>>> just >> >>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> same >> >>>>>>>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>>>>>>> they do today when run in ALOS mode. >> >>> Finally, >> >>>> we >> >>>>>>>> could do >> >>>>>>>>>>> the >> >>>>>>>>>>>>>>> same >> >>>>>>>>>>>>>>>> as >> >>>>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>> KIP-844 and make transactional stores >> >>> opt-in >> >>>>> (it'd >> >>>>>>>> be >> >>>>>>>>>>> better >> >>>>>>>>>>>> to >> >>>>>>>>>>>>>>>> avoid >> >>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> extra opt-in mechanism, but it's a good >> >>>>>>>>>>> get-out-of-jail-free >> >>>>>>>>>>>>>>> card). >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> 4. (minor point) Deprecation of methods >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> You mentioned that the new `commit` method >> >>>>>> replaces >> >>>>>>>>>> flush, >> >>>>>>>>>>>>>>>>>>>> updateChangelogOffsets, and checkpoint. It >> >>>> seems >> >>>>>> to >> >>>>>>>> me >> >>>>>>>>>> that >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> point >> >>>>>>>>>>>>>>>>>>> about >> >>>>>>>>>>>>>>>>>>>> atomicity and Position also suggests that >> >>> it >> >>>>>>>> replaces the >> >>>>>>>>>>>>>>> Position >> >>>>>>>>>>>>>>>>>>>> callbacks. However, the proposal only >> >>>> deprecates >> >>>>>>>> `flush`. >> >>>>>>>>>>>>> Should >> >>>>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>>>>> deprecating other methods as well? >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> Thanks again for the KIP! It's really nice >> >>>> that >> >>>>>> you >> >>>>>>>> and >> >>>>>>>>>>> Alex >> >>>>>>>>>>>>> will >> >>>>>>>>>>>>>>>> get >> >>>>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>>>> chance to collaborate on both directions so >> >>>> that >> >>>>>> we >> >>>>>>>> can >> >>>>>>>>>> get >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>> best >> >>>>>>>>>>>>>>>>>>>> outcome for Streams and its users. >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> -John >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> On 2022/11/21 15:02:15 Nick Telford wrote: >> >>>>>>>>>>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> As I mentioned in the discussion thread >> >>> for >> >>>>>>>> KIP-844, >> >>>>>>>>>> I've >> >>>>>>>>>>>>> been >> >>>>>>>>>>>>>>>>>> working >> >>>>>>>>>>>>>>>>>>> on >> >>>>>>>>>>>>>>>>>>>>> an alternative approach to achieving >> >>> better >> >>>>>>>>>> transactional >> >>>>>>>>>>>>>>>> semantics >> >>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>> Kafka Streams StateStores. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> I've published this separately as >> >>> KIP-892: >> >>>>>>>>>> Transactional >> >>>>>>>>>>>>>>> Semantics >> >>>>>>>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>>>>>>> StateStores >> >>>>>>>>>>>>>>>>>>>>> < >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores >> >>>>>>>>>>>>>>>>>>>>> , >> >>>>>>>>>>>>>>>>>>>>> so that it can be discussed/reviewed >> >>>>> separately >> >>>>>>>> from >> >>>>>>>>>>>> KIP-844. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Alex: I'm especially interested in what >> >>> you >> >>>>>> think! >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> I have a nearly complete implementation >> >>> of >> >>>> the >> >>>>>>>> changes >> >>>>>>>>>>>>>>> outlined in >> >>>>>>>>>>>>>>>>>> this >> >>>>>>>>>>>>>>>>>>>>> KIP, please let me know if you'd like me >> >>> to >> >>>>> push >> >>>>>>>> them >> >>>>>>>>>> for >> >>>>>>>>>>>>>>> review >> >>>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>>>>>> advance >> >>>>>>>>>>>>>>>>>>>>> of a vote. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Nick >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >