Hi Bruno, Thanks for reviewing the KIP. It's been a long road, I started working on this more than a year ago, and most of the time in the last 6 months has been spent on the "Atomic Checkpointing" stuff that's been benched, so some of the reasoning behind some of my decisions have been lost, but I'll do my best to reconstruct them.
1. IIRC, this was the initial approach I tried. I don't remember the exact reasons I changed it to use a separate "view" of the StateStore that encapsulates the transaction, but I believe it had something to do with concurrent access to the StateStore from Interactive Query threads. Reads from interactive queries need to be isolated from the currently ongoing transaction, both for consistency (so interactive queries don't observe changes that are subsequently rolled-back), but also to prevent Iterators opened by an interactive query from being closed and invalidated by the StreamThread when it commits the transaction, which causes your interactive queries to crash. Another reason I believe I implemented it this way was a separation of concerns. Recall that newTransaction() originally created an object of type Transaction, not StateStore. My intent was to improve the type-safety of the API, in an effort to ensure Transactions weren't used incorrectly. Unfortunately, this didn't pan out, but newTransaction() remained. Finally, this had the added benefit that implementations could easily add support for transactions *without* re-writing their existing, non-transactional implementation. I think this can be a benefit both for implementers of custom StateStores, but also for anyone extending RocksDbStore, as they can rely on the existing access methods working how they expect them to. I'm not too happy with the way the current design has panned out, so I'm open to ideas on how to improve it. Key to this is finding some way to ensure that reads from Interactive Query threads are properly isolated from the transaction, *without* the performance overhead of checking which thread the method is being called from on every access. As for replacing flush() with commit() - I saw no reason to add this complexity to the KIP, unless there was a need to add arguments to the flush/commit method. This need arises with Atomic Checkpointing, but that will be implemented separately, in a future KIP. Do you see a need for some arguments to the flush/commit method that I've missed? Or were you simply suggesting a rename? 2. This is simply due to the practical reason that isolationLevel() is really a proxy for checking if the app is under EOS. The application configuration is not provided to the constructor of StateStores, but it *is* provided to init(), via StateStoreContext. For this reason, it seemed somewhat natural to add it to StateStoreContext. I think this makes sense, since the IsolationLevel of all StateStores in an application *must* be the same, and since those stores are all initialized with the same StateStoreContext, it seems natural for that context to carry the desired IsolationLevel to use. 3. Using IsolationLevel instead of just passing `boolean eosEnabled`, like much of the internals was an attempt to logically de-couple the StateStore API from the internals of Kafka Streams. Technically, StateStores don't need to know/care what processing mode the KS app is using, all they need to know is the isolation level expected of them. Having formal definitions for the expectations of the two required IsolationLevels allow implementers to implement transactional stores without having to dig through the internals of Kafka Streams and understand exactly how they are used. The tight coupling between state stores and internal behaviour has actually significantly hindered my progress on this KIP, and encouraged me to avoid increasing this logical coupling as much as possible. This also frees implementations to satisfy those requirements in any way they choose. Transactions might not be the only/available approach to an implementation, but they might have an alternative way to satisfy the isolation requirements. I admit that this point is more about semantics, but "transactional" would need to be formally defined in order for implementers to provide a valid implementation, and these IsolationLevels provide that formal definition. 4. I can remove them. I added them only as I planned to include them in the org.apache.kafka.streams.state package, as a recommended base implementation for all StateStores, including those implemented by users. I had assumed that anything in "public" packages, such as org.apache.kafka.streams.state, should be included in a KIP. Is that wrong? 5. RocksDB provides no way to measure the actual size of a WriteBatch(WithIndex), so we're limited to tracking the sum total of the size of keys + values that are written to the transaction. This obviously under-estimates the actual memory usage, because WriteBatch no-doubt includes some record overheads, and WriteBatchWithIndex has to maintain an index. Ideally, we could trivially add a method upstream to WriteBatchInterface that provides the exact size of the batch, but that would require an upgrade of RocksDB, which won't happen soon. So for the time being, we're stuck with an approximation, so I felt that the new method should reflect that. Would you prefer the new method name ignores this constraint and that we simply make the rocks measurement more accurate in the future? 6. Done 7. Very good point. The KIP already specifically calls out memory in the documentation of the config: "Maximum number of memory bytes to be used to buffer uncommitted state-store records." - did you have something else in mind? Should we also make this clearer by renaming the config property itself? Perhaps to something like statestore.transaction.buffer.max.bytes? 8. OK, I can remove this. The intent here was to describe how Streams itself will manage transaction roll-over etc. Presumably that means we also don't need a description of how Streams will manage the commit of changelog transactions, state store transactions and checkpointing? 9. What do you mean by fail-over? Do you mean failing over an Active Task to an instance already hosting a Standby Task? Thanks again and sorry for the essay of a response! Regards, Nick On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> wrote: > Hi Nick, > > Thanks for the updates! > > I really appreciate that you simplified the KIP by removing some > aspects. As I have already told you, I think the removed aspects are > also good ideas and we can discuss them on follow-up KIPs. > > Regarding the current KIP, I have the following feedback. > > 1. > Is there a good reason to add method newTransaction() to the StateStore > interface? As far as I understand, the idea is that users of a state > store (transactional or not) call this method at start-up and after each > commit. Since the call to newTransaction() is done in any case and I > think it would simplify the caller code if we just start a new > transaction after a commit in the implementation? > As far as I understand, you plan to commit the transaction in the > flush() method. I find the idea to replace flush() with commit() > presented in KIP-844 an elegant solution. > > 2. > Why is the method to query the isolation level added to the state store > context? > > 3. > Do we need all the isolation level definitions? I think it is good to > know the guarantees of the transactionality of the state store. > However, currently, Streams guarantees that there will only be one > transaction that writes to the state store. Only the stream thread that > executes the active task that owns the state store will write to the > state store. I think it should be enough to know if the state store is > transactional or not. So my proposal would be to just add a method on > the state store interface the returns if a state store is transactional > or not by returning a boolean or an enum. > > 4. > I am wondering why AbstractTransaction and AbstractTransactionalStore > are part of the KIP. They look like implementation details that should > not be exposed in the public API. > > 5. > Why does StateStore#approximateNumUncommittedBytes() return an > approximate number of bytes? > > 6. > RocksDB is just one implementation of the state stores in Streams. > However, the issues regarding OOM errors might also apply to other > custom implementations. So in the KIP I would extract that part from > section "RocksDB Transaction". I would also move section "RocksDB > Transaction" to the end of section "Proposed Changes" and handle it as > an example implementation for a state store. > > 7. > Should statestore.uncommitted.max.bytes only limit the uncommitted bytes > or the uncommitted bytes that reside in memory? In future, other > transactional state store implementations might implement a buffer for > uncommitted records that are able to spill records on disk. I think > statestore.uncommitted.max.bytes needs to limit the uncommitted bytes > irrespective if they reside in memory or disk. Since Streams will use > this config to decide if it needs to trigger a commit, state store > implementations that can spill to disk will never be able to spill to > disk. You would only need to change the doc of the config, if you agree > with me. > > 8. > Section "Transaction Management" about the wrappers is rather a > implementation detail that should not be in the KIP. > > 9. > Could you add a section that describes how failover will work with the > transactional state stores? I think section "Error handling" is already > a good start. > > > Best, > Bruno > > > > > On 15.05.23 11:04, Nick Telford wrote: > > Hi everyone, > > > > Quick update: I've added a new section to the KIP: "Offsets for Consumer > > Rebalances", that outlines my solution to the problem that > > StreamsPartitionAssignor needs to read StateStore offsets even if they're > > not currently open. > > > > Regards, > > Nick > > > > On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> > wrote: > > > >> Hi Bruno, > >> > >> Thanks for reviewing my proposal. > >> > >> 1. > >> The main reason I added it was because it was easy to do. If we see no > >> value in it, I can remove it. > >> > >> 2. > >> Global StateStores can have multiple partitions in their input topics > >> (which function as their changelogs), so they would have more than one > >> partition. > >> > >> 3. > >> That's a good point. At present, the only method it adds is > >> isolationLevel(), which is likely not necessary outside of StateStores. > >> It *does* provide slightly different guarantees in the documentation to > >> several of the methods (hence the overrides). I'm not sure if this is > >> enough to warrant a new interface though. > >> I think the question that remains is whether this interface makes it > >> easier to implement custom transactional StateStores than if we were to > >> remove it? Probably not. > >> > >> 4. > >> The main motivation for the Atomic Checkpointing is actually > performance. > >> My team has been testing out an implementation of this KIP without it, > and > >> we had problems with RocksDB doing *much* more compaction, due to the > >> significantly increased flush rate. It was enough of a problem that (for > >> the time being), we had to revert back to Kafka Streams proper. > >> I think the best way to solve this, as you say, is to keep the > .checkpoint > >> files *in addition* to the offsets being stored within the store itself. > >> Essentially, when closing StateStores, we force a memtable flush, then > >> call getCommittedOffsets and write those out to the .checkpoint file. > >> That would ensure the metadata is available to the > >> StreamsPartitionAssignor for all closed stores. > >> If there's a crash (no clean close), then we won't be able to guarantee > >> which offsets were flushed to disk by RocksDB, so we'd need to open ( > >> init()), read offsets, and then close() those stores. But since this is > >> the exception, and will only occur once (provided it doesn't crash every > >> time!), I think the performance impact here would be acceptable. > >> > >> Thanks for the feedback, please let me know if you have any more > comments > >> or questions! > >> > >> I'm currently working on rebasing against trunk. This involves adding > >> support for transactionality to VersionedStateStores. I will probably > need > >> to revise my implementation for transactional "segmented" stores, both > to > >> accommodate VersionedStateStore, and to clean up some other stuff. > >> > >> Regards, > >> Nick > >> > >> > >> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> wrote: > >> > >>> Hi Nick, > >>> > >>> Thanks for the updates! > >>> > >>> I have a couple of questions/comments. > >>> > >>> 1. > >>> Why do you propose a configuration that involves max. bytes and max. > >>> reords? I think we are mainly concerned about memory consumption > because > >>> we want to limit the off-heap memory used. I cannot think of a case > >>> where one would want to set the max. number of records. > >>> > >>> > >>> 2. > >>> Why does > >>> > >>> default void commit(final Map<TopicPartition, Long> > changelogOffsets) { > >>> flush(); > >>> } > >>> > >>> take a map of partitions to changelog offsets? > >>> The mapping between state stores to partitions is a 1:1 relationship. > >>> Passing in a single changelog offset should suffice. > >>> > >>> > >>> 3. > >>> Why do we need the Transaction interface? It should be possible to hide > >>> beginning and committing a transactions withing the state store > >>> implementation, so that from outside the state store, it does not > matter > >>> whether the state store is transactional or not. What would be the > >>> advantage of using the Transaction interface? > >>> > >>> > >>> 4. > >>> Regarding checkpointing offsets, I think we should keep the checkpoint > >>> file in any case for the reason you mentioned about rebalancing. Even > if > >>> that would not be an issue, I would propose to move the change to > offset > >>> management to a new KIP and to not add more complexity than needed to > >>> this one. I would not be too concerned about the consistency violation > >>> you mention. As far as I understand, with transactional state stores > >>> Streams would write the checkpoint file during every commit even under > >>> EOS. In the failure case you describe, Streams would restore the state > >>> stores from the offsets found in the checkpoint file written during the > >>> penultimate commit instead of during the last commit. Basically, > Streams > >>> would overwrite the records written to the state store between the last > >>> two commits with the same records read from the changelogs. While I > >>> understand that this is wasteful, it is -- at the same time -- > >>> acceptable and most importantly it does not break EOS. > >>> > >>> Best, > >>> Bruno > >>> > >>> > >>> On 27.04.23 12:34, Nick Telford wrote: > >>>> Hi everyone, > >>>> > >>>> I find myself (again) considering removing the offset management from > >>>> StateStores, and keeping the old checkpoint file system. The reason is > >>> that > >>>> the StreamPartitionAssignor directly reads checkpoint files in order > to > >>>> determine which instance has the most up-to-date copy of the local > >>> state. > >>>> If we move offsets into the StateStore itself, then we will need to > >>> open, > >>>> initialize, read offsets and then close each StateStore (that is not > >>>> already assigned and open) for which we have *any* local state, on > every > >>>> rebalance. > >>>> > >>>> Generally, I don't think there are many "orphan" stores like this > >>> sitting > >>>> around on most instances, but even a few would introduce additional > >>> latency > >>>> to an already somewhat lengthy rebalance procedure. > >>>> > >>>> I'm leaning towards Colt's (Slack) suggestion of just keeping things > in > >>> the > >>>> checkpoint file(s) for now, and not worrying about the race. The > >>> downside > >>>> is that we wouldn't be able to remove the explicit RocksDB flush > >>> on-commit, > >>>> which likely hurts performance. > >>>> > >>>> If anyone has any thoughts or ideas on this subject, I would > appreciate > >>> it! > >>>> > >>>> Regards, > >>>> Nick > >>>> > >>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com> > >>> wrote: > >>>> > >>>>> Hi Colt, > >>>>> > >>>>> The issue is that if there's a crash between 2 and 3, then you still > >>> end > >>>>> up with inconsistent data in RocksDB. The only way to guarantee that > >>> your > >>>>> checkpoint offsets and locally stored data are consistent with each > >>> other > >>>>> are to atomically commit them, which can be achieved by having the > >>> offsets > >>>>> stored in RocksDB. > >>>>> > >>>>> The offsets column family is likely to be extremely small (one > >>>>> per-changelog partition + one per Topology input partition for > regular > >>>>> stores, one per input partition for global stores). So the overhead > >>> will be > >>>>> minimal. > >>>>> > >>>>> A major benefit of doing this is that we can remove the explicit > calls > >>> to > >>>>> db.flush(), which forcibly flushes memtables to disk on-commit. It > >>> turns > >>>>> out, RocksDB memtable flushes are largely dictated by Kafka Streams > >>>>> commits, *not* RocksDB configuration, which could be a major source > of > >>>>> confusion. Atomic checkpointing makes it safe to remove these > explicit > >>>>> flushes, because it no longer matters exactly when RocksDB flushes > >>> data to > >>>>> disk; since the data and corresponding checkpoint offsets will always > >>> be > >>>>> flushed together, the local store is always in a consistent state, > and > >>>>> on-restart, it can always safely resume restoration from the on-disk > >>>>> offsets, restoring the small amount of data that hadn't been flushed > >>> when > >>>>> the app exited/crashed. > >>>>> > >>>>> Regards, > >>>>> Nick > >>>>> > >>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io> > >>> wrote: > >>>>> > >>>>>> Nick, > >>>>>> > >>>>>> Thanks for your reply. Ack to A) and B). > >>>>>> > >>>>>> For item C), I see what you're referring to. Your proposed solution > >>> will > >>>>>> work, so no need to change it. What I was suggesting was that it > >>> might be > >>>>>> possible to achieve this with only one column family. So long as: > >>>>>> > >>>>>> - No uncommitted records (i.e. not committed to the changelog) > are > >>>>>> *committed* to the state store, AND > >>>>>> - The Checkpoint offset (which refers to the changelog topic) > is > >>> less > >>>>>> than or equal to the last written changelog offset in rocksdb > >>>>>> > >>>>>> I don't see the need to do the full restoration from scratch. My > >>>>>> understanding was that prior to 844/892, full restorations were > >>> required > >>>>>> because there could be uncommitted records written to RocksDB; > >>> however, > >>>>>> given your use of RocksDB transactions, that can be avoided with the > >>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB > >>> transaction, 3) > >>>>>> update offset in checkpoint file. > >>>>>> > >>>>>> Anyways, your proposed solution works equivalently and I don't > believe > >>>>>> there is much overhead to an additional column family in RocksDB. > >>> Perhaps > >>>>>> it may even perform better than making separate writes to the > >>> checkpoint > >>>>>> file. > >>>>>> > >>>>>> Colt McNealy > >>>>>> *Founder, LittleHorse.io* > >>>>>> > >>>>>> > >>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford < > nick.telf...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Colt, > >>>>>>> > >>>>>>> A. I've done my best to de-couple the StateStore stuff from the > rest > >>> of > >>>>>> the > >>>>>>> Streams engine. The fact that there will be only one ongoing > (write) > >>>>>>> transaction at a time is not guaranteed by any API, and is just a > >>>>>>> consequence of the way Streams operates. To that end, I tried to > >>> ensure > >>>>>> the > >>>>>>> documentation and guarantees provided by the new APIs are > >>> independent of > >>>>>>> this incidental behaviour. In practice, you're right, this > >>> essentially > >>>>>>> refers to "interactive queries", which are technically "read > >>>>>> transactions", > >>>>>>> even if they don't actually use the transaction API to isolate > >>>>>> themselves. > >>>>>>> > >>>>>>> B. Yes, although not ideal. This is for backwards compatibility, > >>>>>> because: > >>>>>>> 1) Existing custom StateStore implementations will implement > >>>>>> flush(), > >>>>>>> and not commit(), but the Streams engine now calls commit(), so > those > >>>>>> calls > >>>>>>> need to be forwarded to flush() for these legacy stores. > >>>>>>> 2) Existing StateStore *users*, i.e. outside of the Streams > >>> engine > >>>>>>> itself, may depend on explicitly calling flush(), so for these > cases, > >>>>>>> flush() needs to be redirected to call commit(). > >>>>>>> If anyone has a better way to guarantee compatibility without > >>>>>> introducing > >>>>>>> this potential recursion loop, I'm open to changes! > >>>>>>> > >>>>>>> C. This is described in the "Atomic Checkpointing" section. Offsets > >>> are > >>>>>>> stored in a separate RocksDB column family, which is guaranteed to > be > >>>>>>> atomically flushed to disk with all other column families. The > issue > >>> of > >>>>>>> checkpoints being written to disk after commit causing > inconsistency > >>> if > >>>>>> it > >>>>>>> crashes in between is the reason why, under EOS, checkpoint files > are > >>>>>> only > >>>>>>> written on clean shutdown. This is one of the major causes of "full > >>>>>>> restorations", so moving the offsets into a place where they can be > >>>>>>> guaranteed to be atomically written with the data they checkpoint > >>>>>> allows us > >>>>>>> to write the checkpoint offsets *on every commit*, not just on > clean > >>>>>>> shutdown. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Nick > >>>>>>> > >>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io> > >>> wrote: > >>>>>>> > >>>>>>>> Nick, > >>>>>>>> > >>>>>>>> Thank you for continuing this work. I have a few minor clarifying > >>>>>>>> questions. > >>>>>>>> > >>>>>>>> A) "Records written to any transaction are visible to all other > >>>>>>>> transactions immediately." I am confused here—I thought there > could > >>>>>> only > >>>>>>> be > >>>>>>>> one transaction going on at a time for a given state store given > the > >>>>>>>> threading model for processing records on a Task. Do you mean > >>>>>> Interactive > >>>>>>>> Queries by "other transactions"? (If so, then everything makes > >>> sense—I > >>>>>>>> thought that since IQ were read-only then they didn't count as > >>>>>>>> transactions). > >>>>>>>> > >>>>>>>> B) Is it intentional that the default implementations of the > flush() > >>>>>> and > >>>>>>>> commit() methods in the StateStore class refer to each other in > some > >>>>>> sort > >>>>>>>> of unbounded recursion? > >>>>>>>> > >>>>>>>> C) How will the getCommittedOffset() method work? At first I > thought > >>>>>> the > >>>>>>>> way to do it would be using a special key in the RocksDB store to > >>>>>> store > >>>>>>> the > >>>>>>>> offset, and committing that with the transaction. But upon second > >>>>>>> thought, > >>>>>>>> since restoration from the changelog is an idempotent procedure, I > >>>>>> think > >>>>>>> it > >>>>>>>> would be fine to 1) commit the RocksDB transaction and then 2) > write > >>>>>> the > >>>>>>>> offset to disk in a checkpoint file. If there is a crash between > 1) > >>>>>> and > >>>>>>> 2), > >>>>>>>> I think the only downside is now we replay a few more records (at > a > >>>>>> cost > >>>>>>> of > >>>>>>>> <100ms). Am I missing something there? > >>>>>>>> > >>>>>>>> Other than that, everything makes sense to me. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Colt McNealy > >>>>>>>> *Founder, LittleHorse.io* > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < > >>> nick.telf...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi everyone, > >>>>>>>>> > >>>>>>>>> I've updated the KIP to reflect the latest version of the design: > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>> > >>>>>>>>> There are several changes in there that reflect feedback from > this > >>>>>>>> thread, > >>>>>>>>> and there's a new section and a bunch of interface changes > relating > >>>>>> to > >>>>>>>>> Atomic Checkpointing, which is the final piece of the puzzle to > >>>>>> making > >>>>>>>>> everything robust. > >>>>>>>>> > >>>>>>>>> Let me know what you think! > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford < > nick.telf...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Lucas, > >>>>>>>>>> > >>>>>>>>>> Thanks for looking over my KIP. > >>>>>>>>>> > >>>>>>>>>> A) The bound is per-instance, not per-Task. This was a typo in > the > >>>>>>> KIP > >>>>>>>>>> that I've now corrected. It was originally per-Task, but I > >>>>>> changed it > >>>>>>>> to > >>>>>>>>>> per-instance for exactly the reason you highlighted. > >>>>>>>>>> B) It's worth noting that transactionality is only enabled under > >>>>>> EOS, > >>>>>>>> and > >>>>>>>>>> in the default mode of operation (ALOS), there should be no > >>>>>> change in > >>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the impact > on > >>>>>>>> users > >>>>>>>>> by > >>>>>>>>>> sufficiently low default values for the memory bound > >>>>>> configuration. I > >>>>>>>>>> understand your hesitation to include a significant change of > >>>>>>>> behaviour, > >>>>>>>>>> especially in a minor release, but I suspect that most users > will > >>>>>>>> prefer > >>>>>>>>>> the memory impact (under EOS) to the existing behaviour of > >>>>>> frequent > >>>>>>>> state > >>>>>>>>>> restorations! If this is a problem, the changes can wait until > the > >>>>>>> next > >>>>>>>>>> major release. I'll be running a patched version of streams in > >>>>>>>> production > >>>>>>>>>> with these changes as soon as they're ready, so it won't disrupt > >>>>>> me > >>>>>>> :-D > >>>>>>>>>> C) The main purpose of this sentence was just to note that some > >>>>>>> changes > >>>>>>>>>> will need to be made to the way Segments are handled in order to > >>>>>>> ensure > >>>>>>>>>> they also benefit from transactions. At the time I wrote it, I > >>>>>> hadn't > >>>>>>>>>> figured out the specific changes necessary, so it was > deliberately > >>>>>>>> vague. > >>>>>>>>>> This is the one outstanding problem I'm currently working on, > and > >>>>>>> I'll > >>>>>>>>>> update this section with more detail once I have figured out the > >>>>>>> exact > >>>>>>>>>> changes required. > >>>>>>>>>> D) newTransaction() provides the necessary isolation guarantees. > >>>>>>> While > >>>>>>>>>> the RocksDB implementation of transactions doesn't technically > >>>>>> *need* > >>>>>>>>>> read-only users to call newTransaction(), other implementations > >>>>>>> (e.g. a > >>>>>>>>>> hypothetical PostgresStore) may require it. Calling > >>>>>> newTransaction() > >>>>>>>> when > >>>>>>>>>> no transaction is necessary is essentially free, as it will just > >>>>>>> return > >>>>>>>>>> this. > >>>>>>>>>> > >>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it > >>>>>> should > >>>>>>> be > >>>>>>>>>> fairly obvious where the performance problems stem from: writes > >>>>>> under > >>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with the > >>>>>>>>>> tombstone/record flag, 1 to decode it from the tombstone/record > >>>>>> flag, > >>>>>>>>> and 1 > >>>>>>>>>> to copy the record from the "temporary" store to the "main" > store, > >>>>>>> when > >>>>>>>>> the > >>>>>>>>>> transaction commits. The different approach taken by KIP-869 > >>>>>> should > >>>>>>>>> perform > >>>>>>>>>> much better, as it avoids all these copies, and may actually > >>>>>> perform > >>>>>>>>>> slightly better than trunk, due to batched writes in RocksDB > >>>>>>> performing > >>>>>>>>>> better than non-batched writes.[1] > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Nick > >>>>>>>>>> > >>>>>>>>>> 1: > >>>>>>>>> > >>>>>>> > >>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>> > >>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < > >>>>>> lbruts...@confluent.io > >>>>>>>>> .invalid> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Nick, > >>>>>>>>>>> > >>>>>>>>>>> I'm just starting to read up on the whole discussion about > >>>>>> KIP-892 > >>>>>>> and > >>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think > >>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have some > >>>>>>>>>>> questions about the latest draft. > >>>>>>>>>>> > >>>>>>>>>>> A) If I understand correctly, you propose to put a bound on > the > >>>>>>>>>>> (native) memory consumed by each task. However, I wonder if > this > >>>>>> is > >>>>>>>>>>> sufficient if we have temporary imbalances in the cluster. For > >>>>>>>>>>> example, depending on the timing of rebalances during a cluster > >>>>>>>>>>> restart, it could happen that a single streams node is > assigned a > >>>>>>> lot > >>>>>>>>>>> more tasks than expected. With your proposed change, this would > >>>>>> mean > >>>>>>>>>>> that the memory required by this one node could be a multiple > of > >>>>>>> what > >>>>>>>>>>> is required during normal operation. I wonder if it wouldn't be > >>>>>>> safer > >>>>>>>>>>> to put a global bound on the memory use, across all tasks. > >>>>>>>>>>> B) Generally, the memory concerns still give me the feeling > >>> that > >>>>>>> this > >>>>>>>>>>> should not be enabled by default for all users in a minor > >>>>>> release. > >>>>>>>>>>> C) In section "Transaction Management": the sentence "A > similar > >>>>>>>>>>> analogue will be created to automatically manage `Segment` > >>>>>>>>>>> transactions.". Maybe this is just me lacking some background, > >>>>>> but I > >>>>>>>>>>> do not understand this, it would be great if you could clarify > >>>>>> what > >>>>>>>>>>> you mean here. > >>>>>>>>>>> D) Could you please clarify why IQ has to call > >>> newTransaction(), > >>>>>>> when > >>>>>>>>>>> it's read-only. > >>>>>>>>>>> > >>>>>>>>>>> And one last thing not strictly related to your KIP: if there > is > >>>>>> an > >>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x slower > >>>>>> (e.g. > >>>>>>>>>>> by providing a flame graph), that would be quite interesting. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Lucas > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < > >>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> I've updated the KIP with a more detailed design, which > >>>>>> reflects > >>>>>>> the > >>>>>>>>>>>> implementation I've been working on: > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>> > >>>>>>>>>>>> This new design should address the outstanding points already > >>>>>> made > >>>>>>>> in > >>>>>>>>>>> the > >>>>>>>>>>>> thread. > >>>>>>>>>>>> > >>>>>>>>>>>> Please let me know if there are areas that are unclear or need > >>>>>>> more > >>>>>>>>>>>> clarification. > >>>>>>>>>>>> > >>>>>>>>>>>> I have a (nearly) working implementation. I'm confident that > >>>>>> the > >>>>>>>>>>> remaining > >>>>>>>>>>>> work (making Segments behave) will not impact the documented > >>>>>>> design. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> > >>>>>>>>>>>> Nick > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy < > c...@littlehorse.io > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Nick, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping that, > >>>>>>>> since > >>>>>>>>>>> reading > >>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the > >>>>>> documented > >>>>>>>> API, > >>>>>>>>>>> maybe > >>>>>>>>>>>>> you *wouldn't* have to wait for the next major release to > >>>>>> make > >>>>>>>> that > >>>>>>>>>>> change; > >>>>>>>>>>>>> but given that it would be considered a major change, I like > >>>>>>> your > >>>>>>>>>>> approach > >>>>>>>>>>>>> the best. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Wishing you a speedy recovery and happy coding! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < > >>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended to > >>>>>> try > >>>>>>> to > >>>>>>>>>>> keep the > >>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd > >>>>>> have > >>>>>>> to > >>>>>>>>>>> wait for > >>>>>>>>>>>>> a > >>>>>>>>>>>>>> major version release to land these changes. > >>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of > >>>>>> guarantee, > >>>>>>>> and > >>>>>>>>>>> the > >>>>>>>>>>>>>> typically longer commit intervals would be problematic when > >>>>>>>>> reading > >>>>>>>>>>> only > >>>>>>>>>>>>>> "committed" records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I've been away for 5 days recovering from minor surgery, > >>>>>> but I > >>>>>>>>>>> spent a > >>>>>>>>>>>>>> considerable amount of that time working through ideas for > >>>>>>>>> possible > >>>>>>>>>>>>>> solutions in my head. I think your suggestion of keeping > >>>>>> ALOS > >>>>>>>>>>> as-is, but > >>>>>>>>>>>>>> buffering writes for EOS is the right path forwards, > >>>>>> although > >>>>>>> I > >>>>>>>>>>> have a > >>>>>>>>>>>>>> solution that both expands on this, and provides for some > >>>>>> more > >>>>>>>>>>> formal > >>>>>>>>>>>>>> guarantees. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for > >>>>>>>> "Transactions", > >>>>>>>>>>> with > >>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed" > >>>>>> when > >>>>>>>> under > >>>>>>>>>>> EOS, > >>>>>>>>>>>>> and > >>>>>>>>>>>>>> "Read Uncommitted" under ALOS. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The nice thing about this approach is that it gives us much > >>>>>>> more > >>>>>>>>>>> clearly > >>>>>>>>>>>>>> defined isolation behaviour that can be properly > >>>>>> documented to > >>>>>>>>>>> ensure > >>>>>>>>>>>>> users > >>>>>>>>>>>>>> know what to expect. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm still working out the kinks in the design, and will > >>>>>> update > >>>>>>>> the > >>>>>>>>>>> KIP > >>>>>>>>>>>>> when > >>>>>>>>>>>>>> I have something. The main struggle is trying to implement > >>>>>>> this > >>>>>>>>>>> without > >>>>>>>>>>>>>> making any major changes to the existing interfaces or > >>>>>>> breaking > >>>>>>>>>>> existing > >>>>>>>>>>>>>> implementations, because currently everything expects to > >>>>>>> operate > >>>>>>>>>>> directly > >>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. I > >>>>>> think > >>>>>>>> I'm > >>>>>>>>>>> getting > >>>>>>>>>>>>>> close, although sadly I won't be able to progress much > >>>>>> until > >>>>>>>> next > >>>>>>>>>>> week > >>>>>>>>>>>>> due > >>>>>>>>>>>>>> to some work commitments. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < > >>>>>>> c...@littlehorse.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thank you for the explanation, and also for the updated > >>>>>>> KIP. I > >>>>>>>>> am > >>>>>>>>>>> quite > >>>>>>>>>>>>>>> eager for this improvement to be released as it would > >>>>>>> greatly > >>>>>>>>>>> reduce > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> operational difficulties of EOS streams apps. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Two questions: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 10) > >>>>>>>>>>>>>>>> When reading records, we will use the > >>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB > >>>>>>>>>>>>>>> and WriteBatchWithIndex#newIteratorWithBase utilities in > >>>>>>>> order > >>>>>>>>> to > >>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>> that uncommitted writes are available to query. > >>>>>>>>>>>>>>> Why do extra work to enable the reading of uncommitted > >>>>>>> writes > >>>>>>>>>>> during > >>>>>>>>>>>>> IQ? > >>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, in > >>>>>> my > >>>>>>>>>>> opinion, a > >>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the > >>>>>>>>> guarantee > >>>>>>>>>>> that, > >>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order to > >>>>>> avoid > >>>>>>>>> dirty > >>>>>>>>>>>>> reads, > >>>>>>>>>>>>>>> one currently must query a standby replica (but this > >>>>>> still > >>>>>>>>> doesn't > >>>>>>>>>>>>> fully > >>>>>>>>>>>>>>> guarantee monotonic reads). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization on > >>>>>> ALOS > >>>>>>>>>>> stores? > >>>>>>>>>>>>> The > >>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to > >>>>>>> restore > >>>>>>>>>>> state > >>>>>>>>>>>>> from > >>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was > >>>>>>> acceptable > >>>>>>>> to > >>>>>>>>>>> accept > >>>>>>>>>>>>>>> that there may have been uncommitted writes on disk. On a > >>>>>>> side > >>>>>>>>>>> note, if > >>>>>>>>>>>>>> you > >>>>>>>>>>>>>>> enable this type of store on ALOS processors, the > >>>>>> community > >>>>>>>>> would > >>>>>>>>>>>>>>> definitely want to enable queries on dirty reads; > >>>>>> otherwise > >>>>>>>>> users > >>>>>>>>>>> would > >>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thank you for doing this fantastic work! > >>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < > >>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no > >>>>>>> longer > >>>>>>>>>>> include > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be > >>>>>> added > >>>>>>>> as a > >>>>>>>>>>> KIP > >>>>>>>>>>>>>> later > >>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> to further optimize the consistency and performance of > >>>>>>> state > >>>>>>>>>>> stores. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I've also added a section discussing some of the > >>>>>> concerns > >>>>>>>>> around > >>>>>>>>>>>>>>>> concurrency, especially in the presence of Iterators. > >>>>>> I'm > >>>>>>>>>>> thinking of > >>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting > >>>>>>>>>>> copy-on-write > >>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an > >>>>>>> active > >>>>>>>>>>>>> iterator), > >>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>> I'm open to suggestions. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford < > >>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844 > >>>>>> implementation: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - Writes uncommitted records to a temporary > >>>>>> RocksDB > >>>>>>>>>>> instance > >>>>>>>>>>>>>>>>> - Since tombstones need to be flagged, all > >>>>>> record > >>>>>>>>>>> values are > >>>>>>>>>>>>>>>>> prefixed with a value/tombstone marker. This > >>>>>>>>>>> necessitates a > >>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>> - On-commit, iterates all records in this > >>>>>> temporary > >>>>>>>>>>> instance and > >>>>>>>>>>>>>>>>> writes them to the main RocksDB store. > >>>>>>>>>>>>>>>>> - While iterating, the value/tombstone marker > >>>>>> needs > >>>>>>> to > >>>>>>>> be > >>>>>>>>>>> parsed > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> the real value extracted. This necessitates > >>>>>> another > >>>>>>>>> memory > >>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> My guess is that the cost of iterating the temporary > >>>>>>>> RocksDB > >>>>>>>>>>> store > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> major factor, with the 2 extra memory copies > >>>>>> per-Record > >>>>>>>>>>>>> contributing > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> significant amount too. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy < > >>>>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Out of curiosity, why does the performance of the > >>>>>> store > >>>>>>>>>>> degrade so > >>>>>>>>>>>>>>>>>> significantly with the 844 implementation? I > >>>>>> wouldn't > >>>>>>> be > >>>>>>>>> too > >>>>>>>>>>>>>> surprised > >>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> 50-60% drop (caused by each record being written > >>>>>>> twice), > >>>>>>>>> but > >>>>>>>>>>> 96% > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> extreme. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The only thing I can think of which could create > >>>>>> such a > >>>>>>>>>>> bottleneck > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and > >>>>>>> then > >>>>>>>>>>>>>>> re-serializes > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> store values when copying from the uncommitted to > >>>>>>>> committed > >>>>>>>>>>> store, > >>>>>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the > >>>>>> PR. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford < > >>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points > >>>>>> that > >>>>>>>> have > >>>>>>>>>>> been > >>>>>>>>>>>>>> raised > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit > >>>>>>>> interval > >>>>>>>>>>> of 5 > >>>>>>>>>>>>>>> minutes > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow > >>>>>>> too > >>>>>>>>>>> large. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> There's a couple of different things I can think > >>>>>> of > >>>>>>> to > >>>>>>>>>>> solve > >>>>>>>>>>>>> this: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> - We already have a memory/record limit in the > >>>>>> KIP > >>>>>>>> to > >>>>>>>>>>> prevent > >>>>>>>>>>>>>> OOM > >>>>>>>>>>>>>>>>>>> errors. Should we choose a default value for > >>>>>>> these? > >>>>>>>> My > >>>>>>>>>>>>> concern > >>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> anything we choose might seem rather > >>>>>> arbitrary. We > >>>>>>>>> could > >>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>> its behaviour such that under ALOS, it only > >>>>>>> triggers > >>>>>>>>> the > >>>>>>>>>>>>> commit > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> StateStore, but under EOS, it triggers a > >>>>>> commit of > >>>>>>>> the > >>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>>> transaction. > >>>>>>>>>>>>>>>>>>> - We could introduce a separate ` > >>>>>>>>> checkpoint.interval.ms` > >>>>>>>>>>> to > >>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>>> to commit the StateStores more frequently than > >>>>>> the > >>>>>>>>>>> general > >>>>>>>>>>>>>>>>>>> commit.interval.ms? My concern here is that > >>>>>> the > >>>>>>>>>>> semantics of > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>> would depend on the processing.mode; under > >>>>>> ALOS it > >>>>>>>>> would > >>>>>>>>>>>>> allow > >>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>> frequently committing stores, whereas under > >>>>>> EOS it > >>>>>>>>>>> couldn't. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Any better ideas? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 16:25, Nick Telford < > >>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Alex, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the feedback. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've updated the discussion of OOM issues by > >>>>>>>> describing > >>>>>>>>>>> how > >>>>>>>>>>>>>> we'll > >>>>>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>>>> it. Here's the new text: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> To mitigate this, we will automatically force a > >>>>>>> Task > >>>>>>>>>>> commit if > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> total > >>>>>>>>>>>>>>>>>>>>> uncommitted records returned by > >>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedEntries() > >>>>>>>>> exceeds a > >>>>>>>>>>>>>>> threshold, > >>>>>>>>>>>>>>>>>>>>> configured by > >>>>>>>> max.uncommitted.state.entries.per.task; > >>>>>>>>>>> or the > >>>>>>>>>>>>>>> total > >>>>>>>>>>>>>>>>>>>>> memory used for buffering uncommitted records > >>>>>>>> returned > >>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() > >>>>>>> exceeds > >>>>>>>>> the > >>>>>>>>>>>>>> threshold > >>>>>>>>>>>>>>>>>>>>> configured by > >>>>>>> max.uncommitted.state.bytes.per.task. > >>>>>>>>>>> This will > >>>>>>>>>>>>>>>> roughly > >>>>>>>>>>>>>>>>>>>>> bound the memory required per-Task for > >>>>>> buffering > >>>>>>>>>>> uncommitted > >>>>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>>>>> irrespective of the commit.interval.ms, and > >>>>>> will > >>>>>>>>>>> effectively > >>>>>>>>>>>>>>> bound > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> number of records that will need to be > >>>>>> restored in > >>>>>>>> the > >>>>>>>>>>> event > >>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>>>>>> failure. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> These limits will be checked in > >>>>>> StreamTask#process > >>>>>>>> and > >>>>>>>>> a > >>>>>>>>>>>>>> premature > >>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>> will be requested via Task#requestCommit(). > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Note that these new methods provide default > >>>>>>>>>>> implementations > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>>>> existing custom stores and non-transactional > >>>>>>> stores > >>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>>>> InMemoryKeyValueStore) do not force any early > >>>>>>>> commits. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've chosen to have the StateStore expose > >>>>>>>>> approximations > >>>>>>>>>>> of > >>>>>>>>>>>>> its > >>>>>>>>>>>>>>>> buffer > >>>>>>>>>>>>>>>>>>>> size/count instead of opaquely requesting a > >>>>>> commit > >>>>>>> in > >>>>>>>>>>> order to > >>>>>>>>>>>>>>>>>> delegate > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> decision making to the Task itself. This enables > >>>>>>>> Tasks > >>>>>>>>>>> to look > >>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>> *all* > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> their StateStores, and determine whether an > >>>>>> early > >>>>>>>>> commit > >>>>>>>>>>> is > >>>>>>>>>>>>>>>> necessary. > >>>>>>>>>>>>>>>>>>>> Notably, it enables pre-Task thresholds, > >>>>>> instead of > >>>>>>>>>>> per-Store, > >>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>> prevents Tasks with many StateStores from using > >>>>>>> much > >>>>>>>>> more > >>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>> Tasks > >>>>>>>>>>>>>>>>>>>> with one StateStore. This makes sense, since > >>>>>>> commits > >>>>>>>>> are > >>>>>>>>>>> done > >>>>>>>>>>>>>>>> by-Task, > >>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> by-Store. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Prizes* for anyone who can come up with a better > >>>>>>> name > >>>>>>>>>>> for the > >>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>>> properties! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for pointing out the potential > >>>>>> performance > >>>>>>>>> issues > >>>>>>>>>>> of > >>>>>>>>>>>>>> WBWI. > >>>>>>>>>>>>>>>> From > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> benchmarks that user posted[1], it looks like > >>>>>> WBWI > >>>>>>>>> still > >>>>>>>>>>>>>> performs > >>>>>>>>>>>>>>>>>>>> considerably better than individual puts, which > >>>>>> is > >>>>>>>> the > >>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> design, > >>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>> I'd actually expect a performance boost from > >>>>>> WBWI, > >>>>>>>> just > >>>>>>>>>>> not as > >>>>>>>>>>>>>>> great > >>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>> we'd get from a plain WriteBatch. This does > >>>>>> suggest > >>>>>>>>> that > >>>>>>>>>>> a > >>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>> optimization would be to use a regular > >>>>>> WriteBatch > >>>>>>> for > >>>>>>>>>>>>>> restoration > >>>>>>>>>>>>>>>> (in > >>>>>>>>>>>>>>>>>>>> RocksDBStore#restoreBatch), since we know that > >>>>>>> those > >>>>>>>>>>> records > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> never > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> queried before they're committed. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>> > >>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> * Just kidding, no prizes, sadly. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 12:28, Alexander > >>>>>> Sorokoumov > >>>>>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hey Nick, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thank you for the KIP! With such a significant > >>>>>>>>>>> performance > >>>>>>>>>>>>>>>>>> degradation > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> the secondary store approach, we should > >>>>>> definitely > >>>>>>>>>>> consider > >>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex. I also like encapsulating > >>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>> inside > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> default state store implementation to improve > >>>>>>>>>>> performance. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> +1 to John's comment to keep the current > >>>>>>>> checkpointing > >>>>>>>>>>> as a > >>>>>>>>>>>>>>>> fallback > >>>>>>>>>>>>>>>>>>>>> mechanism. We want to keep existing users' > >>>>>>> workflows > >>>>>>>>>>> intact > >>>>>>>>>>>>> if > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> can. A > >>>>>>>>>>>>>>>>>>>>> non-intrusive way would be to add a separate > >>>>>>>>> StateStore > >>>>>>>>>>>>> method, > >>>>>>>>>>>>>>>> say, > >>>>>>>>>>>>>>>>>>>>> StateStore#managesCheckpointing(), that > >>>>>> controls > >>>>>>>>>>> whether the > >>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>>> implementation owns checkpointing. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I think that a solution to the transactional > >>>>>>> writes > >>>>>>>>>>> should > >>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> OOMEs. One possible way to address that is to > >>>>>> wire > >>>>>>>>>>>>> StateStore's > >>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>> request by adding, say, StateStore#commitNeeded > >>>>>>> that > >>>>>>>>> is > >>>>>>>>>>>>> checked > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>> StreamTask#commitNeeded via the corresponding > >>>>>>>>>>>>>>>> ProcessorStateManager. > >>>>>>>>>>>>>>>>>>> With > >>>>>>>>>>>>>>>>>>>>> that change, RocksDBStore will have to track > >>>>>> the > >>>>>>>>> current > >>>>>>>>>>>>>>>> transaction > >>>>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>>>>>> and request a commit when the size goes over a > >>>>>>>>>>> (configurable) > >>>>>>>>>>>>>>>>>> threshold. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> AFAIU WriteBatchWithIndex might perform > >>>>>>>> significantly > >>>>>>>>>>> slower > >>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>> non-txn > >>>>>>>>>>>>>>>>>>>>> puts as the batch size grows [1]. We should > >>>>>> have a > >>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> fall > >>>>>>>>>>>>>>>>>>>>> back to the current behavior (and/or disable > >>>>>> txn > >>>>>>>>> stores > >>>>>>>>>>> for > >>>>>>>>>>>>>> ALOS) > >>>>>>>>>>>>>>>>>> unless > >>>>>>>>>>>>>>>>>>>>> the benchmarks show negligible overhead for > >>>>>> longer > >>>>>>>>>>> commits / > >>>>>>>>>>>>>>>>>>> large-enough > >>>>>>>>>>>>>>>>>>>>> batch sizes. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> If you prefer to keep the KIP smaller, I would > >>>>>>>> rather > >>>>>>>>>>> cut out > >>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing rather than > >>>>>>> proper > >>>>>>>>>>> OOMe > >>>>>>>>>>>>>>> handling > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> being able to switch to non-txn behavior. The > >>>>>>>>>>> checkpointing > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>> necessary to solve the recovery-under-EOS > >>>>>> problem. > >>>>>>>> On > >>>>>>>>>>> the > >>>>>>>>>>>>> other > >>>>>>>>>>>>>>>> hand, > >>>>>>>>>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex is in, it will be much > >>>>>> easier > >>>>>>> to > >>>>>>>>> add > >>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> If you share the current implementation, I am > >>>>>>> happy > >>>>>>>> to > >>>>>>>>>>> help > >>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> OOMe and configuration parts as well as review > >>>>>> and > >>>>>>>>> test > >>>>>>>>>>> the > >>>>>>>>>>>>>>> patch. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>> Alex > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 1. > >>>>>> https://github.com/facebook/rocksdb/issues/608 > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford < > >>>>>>>>>>>>>>>> nick.telf...@gmail.com > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi John, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the review and feedback! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1. Custom Stores: I've been mulling over this > >>>>>>>>> problem > >>>>>>>>>>>>> myself. > >>>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>> stands, > >>>>>>>>>>>>>>>>>>>>>> custom stores would essentially lose > >>>>>>> checkpointing > >>>>>>>>>>> with no > >>>>>>>>>>>>>>>>>> indication > >>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> they're expected to make changes, besides a > >>>>>> line > >>>>>>>> in > >>>>>>>>>>> the > >>>>>>>>>>>>>> release > >>>>>>>>>>>>>>>>>>> notes. I > >>>>>>>>>>>>>>>>>>>>>> agree that the best solution would be to > >>>>>>> provide a > >>>>>>>>>>> default > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> checkpoints > >>>>>>>>>>>>>>>>>>>>>> to a file. The one thing I would change is > >>>>>> that > >>>>>>>> the > >>>>>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> store-local file, instead of a per-Task file. > >>>>>>> This > >>>>>>>>>>> way the > >>>>>>>>>>>>>>>>>> StateStore > >>>>>>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>>>> technically owns its own checkpointing (via a > >>>>>>>>> default > >>>>>>>>>>>>>>>>>> implementation), > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> the StateManager/Task execution engine > >>>>>> doesn't > >>>>>>>> need > >>>>>>>>>>> to know > >>>>>>>>>>>>>>>>>> anything > >>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>> checkpointing, which greatly simplifies some > >>>>>> of > >>>>>>>> the > >>>>>>>>>>> logic. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 2. OOME errors: The main reasons why I didn't > >>>>>>>>> explore > >>>>>>>>>>> a > >>>>>>>>>>>>>>> solution > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> this is > >>>>>>>>>>>>>>>>>>>>>> a) to keep this KIP as simple as possible, > >>>>>> and > >>>>>>> b) > >>>>>>>>>>> because > >>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>> exactly > >>>>>>>>>>>>>>>>>>>>>> how to signal that a Task should commit > >>>>>>>> prematurely. > >>>>>>>>>>> I'm > >>>>>>>>>>>>>>>> confident > >>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>> possible, and I think it's worth adding a > >>>>>>> section > >>>>>>>> on > >>>>>>>>>>>>> handling > >>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>> Besides > >>>>>>>>>>>>>>>>>>>>>> my proposal to force an early commit once > >>>>>> memory > >>>>>>>>> usage > >>>>>>>>>>>>>> reaches > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> threshold, > >>>>>>>>>>>>>>>>>>>>>> is there any other approach that you might > >>>>>>> suggest > >>>>>>>>> for > >>>>>>>>>>>>>> tackling > >>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> problem? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 3. ALOS: I can add in an explicit paragraph, > >>>>>> but > >>>>>>>> my > >>>>>>>>>>>>>> assumption > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> since transactional behaviour comes at > >>>>>> little/no > >>>>>>>>>>> cost, that > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> available by default on all stores, > >>>>>> irrespective > >>>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>>> mode. > >>>>>>>>>>>>>>>>>>>>>> While ALOS doesn't use transactions, the Task > >>>>>>>> itself > >>>>>>>>>>> still > >>>>>>>>>>>>>>>>>> "commits", > >>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>> the behaviour should be correct under ALOS > >>>>>> too. > >>>>>>>> I'm > >>>>>>>>>>> not > >>>>>>>>>>>>>>> convinced > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>> worth having both > >>>>>>> transactional/non-transactional > >>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>> available, as > >>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> would considerably increase the complexity of > >>>>>>> the > >>>>>>>>>>> codebase, > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>> little > >>>>>>>>>>>>>>>>>>>>>> benefit. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 4. Method deprecation: Are you referring to > >>>>>>>>>>>>>>>>>> StateStore#getPosition()? > >>>>>>>>>>>>>>>>>>>>> As I > >>>>>>>>>>>>>>>>>>>>>> understand it, Position contains the > >>>>>> position of > >>>>>>>> the > >>>>>>>>>>>>> *source* > >>>>>>>>>>>>>>>>>> topics, > >>>>>>>>>>>>>>>>>>>>>> whereas the commit offsets would be the > >>>>>>>> *changelog* > >>>>>>>>>>>>> offsets. > >>>>>>>>>>>>>> So > >>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>>>> necessary to retain the Position data, as > >>>>>> well > >>>>>>> as > >>>>>>>>> the > >>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>>> offsets. > >>>>>>>>>>>>>>>>>>>>>> What I meant in the KIP is that Position > >>>>>> offsets > >>>>>>>> are > >>>>>>>>>>>>>> currently > >>>>>>>>>>>>>>>>>> stored > >>>>>>>>>>>>>>>>>>>>> in a > >>>>>>>>>>>>>>>>>>>>>> file, and since we can atomically store > >>>>>> metadata > >>>>>>>>>>> along with > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>>>>>> batch we commit to RocksDB, we can move our > >>>>>>>> Position > >>>>>>>>>>>>> offsets > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> metadata too, and gain the same transactional > >>>>>>>>>>> guarantees > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>> changelog offsets, ensuring that the Position > >>>>>>>>> offsets > >>>>>>>>>>> are > >>>>>>>>>>>>>>>>>> consistent > >>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> the records that are read from the database. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Tue, 22 Nov 2022 at 16:25, John Roesler < > >>>>>>>>>>>>>>> vvcep...@apache.org> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for publishing this alternative, > >>>>>> Nick! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> The benchmark you mentioned in the KIP-844 > >>>>>>>>>>> discussion > >>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>> compelling reason to revisit the built-in > >>>>>>>>>>>>> transactionality > >>>>>>>>>>>>>>>>>>> mechanism. > >>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>> also appreciate you analysis, showing that > >>>>>> for > >>>>>>>>> most > >>>>>>>>>>> use > >>>>>>>>>>>>>>> cases, > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>>>>>>> batch approach should be just fine. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> There are a couple of points that would > >>>>>> hold > >>>>>>> me > >>>>>>>>>>> back from > >>>>>>>>>>>>>>>>>> approving > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> KIP right now: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 1. Loss of coverage for custom stores. > >>>>>>>>>>>>>>>>>>>>>>> The fact that you can plug in a > >>>>>> (relatively) > >>>>>>>>> simple > >>>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> XStateStore interfaces and automagically > >>>>>> get a > >>>>>>>>>>>>> distributed > >>>>>>>>>>>>>>>>>> database > >>>>>>>>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> it is a significant benefit of Kafka > >>>>>> Streams. > >>>>>>>> I'd > >>>>>>>>>>> hate to > >>>>>>>>>>>>>>> lose > >>>>>>>>>>>>>>>>>> it, > >>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> would be better to spend some time and > >>>>>> come up > >>>>>>>>> with > >>>>>>>>>>> a way > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> preserve > >>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> property. For example, can we provide a > >>>>>>> default > >>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> `commit(..)` that re-implements the > >>>>>> existing > >>>>>>>>>>>>>> checkpoint-file > >>>>>>>>>>>>>>>>>>>>> approach? Or > >>>>>>>>>>>>>>>>>>>>>>> perhaps add an `isTransactional()` flag to > >>>>>> the > >>>>>>>>> state > >>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>> interface > >>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>> that the runtime can decide whether to > >>>>>>> continue > >>>>>>>> to > >>>>>>>>>>> manage > >>>>>>>>>>>>>>>>>> checkpoint > >>>>>>>>>>>>>>>>>>>>>> files > >>>>>>>>>>>>>>>>>>>>>>> vs delegating transactionality to the > >>>>>> stores? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 2. Guarding against OOME > >>>>>>>>>>>>>>>>>>>>>>> I appreciate your analysis, but I don't > >>>>>> think > >>>>>>>> it's > >>>>>>>>>>>>>> sufficient > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> say > >>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> we will solve the memory problem later if > >>>>>> it > >>>>>>>>> becomes > >>>>>>>>>>>>>>> necessary. > >>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>> experience leading to that situation would > >>>>>> be > >>>>>>>>> quite > >>>>>>>>>>> bad: > >>>>>>>>>>>>>>>> Imagine, > >>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>> upgrade to AK 3.next, your tests pass, so > >>>>>> you > >>>>>>>>>>> deploy to > >>>>>>>>>>>>>>>>>> production. > >>>>>>>>>>>>>>>>>>>>> That > >>>>>>>>>>>>>>>>>>>>>>> night, you get paged because your app is > >>>>>> now > >>>>>>>>>>> crashing > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> OOMEs. As > >>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>> all OOMEs, you'll have a really hard time > >>>>>>>> finding > >>>>>>>>>>> the > >>>>>>>>>>>>> root > >>>>>>>>>>>>>>>> cause, > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>>>> you do, you won't have a clear path to > >>>>>> resolve > >>>>>>>> the > >>>>>>>>>>> issue. > >>>>>>>>>>>>>> You > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>> tune down the commit interval and cache > >>>>>> buffer > >>>>>>>>> size > >>>>>>>>>>> until > >>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> stop > >>>>>>>>>>>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>>>>>>>>> crashes. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> FYI, I know of multiple cases where people > >>>>>> run > >>>>>>>> EOS > >>>>>>>>>>> with > >>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>> larger > >>>>>>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>>>> intervals to get better batching than the > >>>>>>>> default, > >>>>>>>>>>> so I > >>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> pathological case would be as rare as you > >>>>>>>> suspect. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Given that we already have the rudiments > >>>>>> of an > >>>>>>>>> idea > >>>>>>>>>>> of > >>>>>>>>>>>>> what > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> prevent this downside, we should take the > >>>>>> time > >>>>>>>> to > >>>>>>>>>>> design > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> solution. > >>>>>>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>> owe > >>>>>>>>>>>>>>>>>>>>>>> it to our users to ensure that awesome new > >>>>>>>>> features > >>>>>>>>>>> don't > >>>>>>>>>>>>>>> come > >>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> bitter > >>>>>>>>>>>>>>>>>>>>>>> pills unless we can't avoid it. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 3. ALOS mode. > >>>>>>>>>>>>>>>>>>>>>>> On the other hand, I didn't see an > >>>>>> indication > >>>>>>> of > >>>>>>>>> how > >>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>> handled under ALOS (aka non-EOS) mode. > >>>>>>>>>>> Theoretically, the > >>>>>>>>>>>>>>>>>>>>>> transactionality > >>>>>>>>>>>>>>>>>>>>>>> of the store and the processing mode are > >>>>>>>>>>> orthogonal. A > >>>>>>>>>>>>>>>>>> transactional > >>>>>>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>>>>> would serve ALOS just as well as a > >>>>>>>>>>> non-transactional one > >>>>>>>>>>>>>> (if > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>> better). > >>>>>>>>>>>>>>>>>>>>>>> Under ALOS, though, the default commit > >>>>>>> interval > >>>>>>>> is > >>>>>>>>>>> five > >>>>>>>>>>>>>>>> minutes, > >>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> memory issue is far more pressing. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> As I see it, we have several options to > >>>>>>> resolve > >>>>>>>>> this > >>>>>>>>>>>>> point. > >>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>> demonstrate that transactional stores work > >>>>>>> just > >>>>>>>>>>> fine for > >>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>> therefore just swap over unconditionally. > >>>>>> We > >>>>>>>> could > >>>>>>>>>>> also > >>>>>>>>>>>>>>> disable > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> transactional mechanism under ALOS so that > >>>>>>>> stores > >>>>>>>>>>> operate > >>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>> they do today when run in ALOS mode. > >>>>>> Finally, > >>>>>>> we > >>>>>>>>>>> could do > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> KIP-844 and make transactional stores > >>>>>> opt-in > >>>>>>>> (it'd > >>>>>>>>>>> be > >>>>>>>>>>>>>> better > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> avoid > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> extra opt-in mechanism, but it's a good > >>>>>>>>>>>>>> get-out-of-jail-free > >>>>>>>>>>>>>>>>>> card). > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 4. (minor point) Deprecation of methods > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> You mentioned that the new `commit` method > >>>>>>>>> replaces > >>>>>>>>>>>>> flush, > >>>>>>>>>>>>>>>>>>>>>>> updateChangelogOffsets, and checkpoint. It > >>>>>>> seems > >>>>>>>>> to > >>>>>>>>>>> me > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> point > >>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>> atomicity and Position also suggests that > >>>>>> it > >>>>>>>>>>> replaces the > >>>>>>>>>>>>>>>>>> Position > >>>>>>>>>>>>>>>>>>>>>>> callbacks. However, the proposal only > >>>>>>> deprecates > >>>>>>>>>>> `flush`. > >>>>>>>>>>>>>>>> Should > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>> deprecating other methods as well? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks again for the KIP! It's really nice > >>>>>>> that > >>>>>>>>> you > >>>>>>>>>>> and > >>>>>>>>>>>>>> Alex > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> chance to collaborate on both directions so > >>>>>>> that > >>>>>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> best > >>>>>>>>>>>>>>>>>>>>>>> outcome for Streams and its users. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 2022/11/21 15:02:15 Nick Telford wrote: > >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> As I mentioned in the discussion thread > >>>>>> for > >>>>>>>>>>> KIP-844, > >>>>>>>>>>>>> I've > >>>>>>>>>>>>>>>> been > >>>>>>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>> an alternative approach to achieving > >>>>>> better > >>>>>>>>>>>>> transactional > >>>>>>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> Kafka Streams StateStores. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I've published this separately as > >>>>>> KIP-892: > >>>>>>>>>>>>> Transactional > >>>>>>>>>>>>>>>>>> Semantics > >>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> StateStores > >>>>>>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>>>>>>>>>> , > >>>>>>>>>>>>>>>>>>>>>>>> so that it can be discussed/reviewed > >>>>>>>> separately > >>>>>>>>>>> from > >>>>>>>>>>>>>>> KIP-844. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Alex: I'm especially interested in what > >>>>>> you > >>>>>>>>> think! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I have a nearly complete implementation > >>>>>> of > >>>>>>> the > >>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>> outlined in > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> KIP, please let me know if you'd like me > >>>>>> to > >>>>>>>> push > >>>>>>>>>>> them > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> review > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> advance > >>>>>>>>>>>>>>>>>>>>>>>> of a vote. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >