Hi John, By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any class called "RecordCache"...
Cheers, Nick On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> wrote: > Hi Nick, > > Thanks for picking this up again! > > I did have one new thought over the intervening months, which I'd like > your take on. > > What if, instead of using the RocksDB atomic write primitive at all, we > instead just: > 1. disable memtables entirely > 2. directly write the RecordCache into SST files when we flush > 3. atomically ingest the SST file(s) into RocksDB when we get the ACK > from the changelog (see > > https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md > and > > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java > and > > https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429 > ) > 4. track the changelog offsets either in another CF or the same CF with > a reserved key, either of which will make the changelog offset update > atomic with the file ingestions > > I suspect this'll have a number of benefits: > * writes to RocksDB will always be atomic > * we don't fragment memory between the RecordCache and the memtables > * RecordCache gives far higher performance than memtable for reads and > writes > * we don't need any new "transaction" concepts or memory bound configs > > What do you think? > > Thanks, > -John > > On 6/20/23 10:51, Nick Telford wrote: > > Hi Bruno, > > > > Thanks for reviewing the KIP. It's been a long road, I started working on > > this more than a year ago, and most of the time in the last 6 months has > > been spent on the "Atomic Checkpointing" stuff that's been benched, so > some > > of the reasoning behind some of my decisions have been lost, but I'll do > my > > best to reconstruct them. > > > > 1. > > IIRC, this was the initial approach I tried. I don't remember the exact > > reasons I changed it to use a separate "view" of the StateStore that > > encapsulates the transaction, but I believe it had something to do with > > concurrent access to the StateStore from Interactive Query threads. Reads > > from interactive queries need to be isolated from the currently ongoing > > transaction, both for consistency (so interactive queries don't observe > > changes that are subsequently rolled-back), but also to prevent Iterators > > opened by an interactive query from being closed and invalidated by the > > StreamThread when it commits the transaction, which causes your > interactive > > queries to crash. > > > > Another reason I believe I implemented it this way was a separation of > > concerns. Recall that newTransaction() originally created an object of > type > > Transaction, not StateStore. My intent was to improve the type-safety of > > the API, in an effort to ensure Transactions weren't used incorrectly. > > Unfortunately, this didn't pan out, but newTransaction() remained. > > > > Finally, this had the added benefit that implementations could easily add > > support for transactions *without* re-writing their existing, > > non-transactional implementation. I think this can be a benefit both for > > implementers of custom StateStores, but also for anyone extending > > RocksDbStore, as they can rely on the existing access methods working how > > they expect them to. > > > > I'm not too happy with the way the current design has panned out, so I'm > > open to ideas on how to improve it. Key to this is finding some way to > > ensure that reads from Interactive Query threads are properly isolated > from > > the transaction, *without* the performance overhead of checking which > > thread the method is being called from on every access. > > > > As for replacing flush() with commit() - I saw no reason to add this > > complexity to the KIP, unless there was a need to add arguments to the > > flush/commit method. This need arises with Atomic Checkpointing, but that > > will be implemented separately, in a future KIP. Do you see a need for > some > > arguments to the flush/commit method that I've missed? Or were you simply > > suggesting a rename? > > > > 2. > > This is simply due to the practical reason that isolationLevel() is > really > > a proxy for checking if the app is under EOS. The application > configuration > > is not provided to the constructor of StateStores, but it *is* provided > to > > init(), via StateStoreContext. For this reason, it seemed somewhat > natural > > to add it to StateStoreContext. I think this makes sense, since the > > IsolationLevel of all StateStores in an application *must* be the same, > and > > since those stores are all initialized with the same StateStoreContext, > it > > seems natural for that context to carry the desired IsolationLevel to > use. > > > > 3. > > Using IsolationLevel instead of just passing `boolean eosEnabled`, like > > much of the internals was an attempt to logically de-couple the > StateStore > > API from the internals of Kafka Streams. Technically, StateStores don't > > need to know/care what processing mode the KS app is using, all they need > > to know is the isolation level expected of them. > > > > Having formal definitions for the expectations of the two required > > IsolationLevels allow implementers to implement transactional stores > > without having to dig through the internals of Kafka Streams and > understand > > exactly how they are used. The tight coupling between state stores and > > internal behaviour has actually significantly hindered my progress on > this > > KIP, and encouraged me to avoid increasing this logical coupling as much > as > > possible. > > > > This also frees implementations to satisfy those requirements in any way > > they choose. Transactions might not be the only/available approach to an > > implementation, but they might have an alternative way to satisfy the > > isolation requirements. I admit that this point is more about semantics, > > but "transactional" would need to be formally defined in order for > > implementers to provide a valid implementation, and these IsolationLevels > > provide that formal definition. > > > > 4. > > I can remove them. I added them only as I planned to include them in the > > org.apache.kafka.streams.state package, as a recommended base > > implementation for all StateStores, including those implemented by > users. I > > had assumed that anything in "public" packages, such as > > org.apache.kafka.streams.state, should be included in a KIP. Is that > wrong? > > > > 5. > > RocksDB provides no way to measure the actual size of a > > WriteBatch(WithIndex), so we're limited to tracking the sum total of the > > size of keys + values that are written to the transaction. This obviously > > under-estimates the actual memory usage, because WriteBatch no-doubt > > includes some record overheads, and WriteBatchWithIndex has to maintain > an > > index. > > > > Ideally, we could trivially add a method upstream to WriteBatchInterface > > that provides the exact size of the batch, but that would require an > > upgrade of RocksDB, which won't happen soon. So for the time being, we're > > stuck with an approximation, so I felt that the new method should reflect > > that. > > > > Would you prefer the new method name ignores this constraint and that we > > simply make the rocks measurement more accurate in the future? > > > > 6. > > Done > > > > 7. > > Very good point. The KIP already specifically calls out memory in the > > documentation of the config: "Maximum number of memory bytes to be used > to > > buffer uncommitted state-store records." - did you have something else in > > mind? > > > > Should we also make this clearer by renaming the config property itself? > > Perhaps to something like statestore.transaction.buffer.max.bytes? > > > > 8. > > OK, I can remove this. The intent here was to describe how Streams itself > > will manage transaction roll-over etc. Presumably that means we also > don't > > need a description of how Streams will manage the commit of changelog > > transactions, state store transactions and checkpointing? > > > > 9. > > What do you mean by fail-over? Do you mean failing over an Active Task to > > an instance already hosting a Standby Task? > > > > Thanks again and sorry for the essay of a response! > > > > Regards, > > Nick > > > > On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> wrote: > > > >> Hi Nick, > >> > >> Thanks for the updates! > >> > >> I really appreciate that you simplified the KIP by removing some > >> aspects. As I have already told you, I think the removed aspects are > >> also good ideas and we can discuss them on follow-up KIPs. > >> > >> Regarding the current KIP, I have the following feedback. > >> > >> 1. > >> Is there a good reason to add method newTransaction() to the StateStore > >> interface? As far as I understand, the idea is that users of a state > >> store (transactional or not) call this method at start-up and after each > >> commit. Since the call to newTransaction() is done in any case and I > >> think it would simplify the caller code if we just start a new > >> transaction after a commit in the implementation? > >> As far as I understand, you plan to commit the transaction in the > >> flush() method. I find the idea to replace flush() with commit() > >> presented in KIP-844 an elegant solution. > >> > >> 2. > >> Why is the method to query the isolation level added to the state store > >> context? > >> > >> 3. > >> Do we need all the isolation level definitions? I think it is good to > >> know the guarantees of the transactionality of the state store. > >> However, currently, Streams guarantees that there will only be one > >> transaction that writes to the state store. Only the stream thread that > >> executes the active task that owns the state store will write to the > >> state store. I think it should be enough to know if the state store is > >> transactional or not. So my proposal would be to just add a method on > >> the state store interface the returns if a state store is transactional > >> or not by returning a boolean or an enum. > >> > >> 4. > >> I am wondering why AbstractTransaction and AbstractTransactionalStore > >> are part of the KIP. They look like implementation details that should > >> not be exposed in the public API. > >> > >> 5. > >> Why does StateStore#approximateNumUncommittedBytes() return an > >> approximate number of bytes? > >> > >> 6. > >> RocksDB is just one implementation of the state stores in Streams. > >> However, the issues regarding OOM errors might also apply to other > >> custom implementations. So in the KIP I would extract that part from > >> section "RocksDB Transaction". I would also move section "RocksDB > >> Transaction" to the end of section "Proposed Changes" and handle it as > >> an example implementation for a state store. > >> > >> 7. > >> Should statestore.uncommitted.max.bytes only limit the uncommitted bytes > >> or the uncommitted bytes that reside in memory? In future, other > >> transactional state store implementations might implement a buffer for > >> uncommitted records that are able to spill records on disk. I think > >> statestore.uncommitted.max.bytes needs to limit the uncommitted bytes > >> irrespective if they reside in memory or disk. Since Streams will use > >> this config to decide if it needs to trigger a commit, state store > >> implementations that can spill to disk will never be able to spill to > >> disk. You would only need to change the doc of the config, if you agree > >> with me. > >> > >> 8. > >> Section "Transaction Management" about the wrappers is rather a > >> implementation detail that should not be in the KIP. > >> > >> 9. > >> Could you add a section that describes how failover will work with the > >> transactional state stores? I think section "Error handling" is already > >> a good start. > >> > >> > >> Best, > >> Bruno > >> > >> > >> > >> > >> On 15.05.23 11:04, Nick Telford wrote: > >>> Hi everyone, > >>> > >>> Quick update: I've added a new section to the KIP: "Offsets for > Consumer > >>> Rebalances", that outlines my solution to the problem that > >>> StreamsPartitionAssignor needs to read StateStore offsets even if > they're > >>> not currently open. > >>> > >>> Regards, > >>> Nick > >>> > >>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> > >> wrote: > >>> > >>>> Hi Bruno, > >>>> > >>>> Thanks for reviewing my proposal. > >>>> > >>>> 1. > >>>> The main reason I added it was because it was easy to do. If we see no > >>>> value in it, I can remove it. > >>>> > >>>> 2. > >>>> Global StateStores can have multiple partitions in their input topics > >>>> (which function as their changelogs), so they would have more than one > >>>> partition. > >>>> > >>>> 3. > >>>> That's a good point. At present, the only method it adds is > >>>> isolationLevel(), which is likely not necessary outside of > StateStores. > >>>> It *does* provide slightly different guarantees in the documentation > to > >>>> several of the methods (hence the overrides). I'm not sure if this is > >>>> enough to warrant a new interface though. > >>>> I think the question that remains is whether this interface makes it > >>>> easier to implement custom transactional StateStores than if we were > to > >>>> remove it? Probably not. > >>>> > >>>> 4. > >>>> The main motivation for the Atomic Checkpointing is actually > >> performance. > >>>> My team has been testing out an implementation of this KIP without it, > >> and > >>>> we had problems with RocksDB doing *much* more compaction, due to the > >>>> significantly increased flush rate. It was enough of a problem that > (for > >>>> the time being), we had to revert back to Kafka Streams proper. > >>>> I think the best way to solve this, as you say, is to keep the > >> .checkpoint > >>>> files *in addition* to the offsets being stored within the store > itself. > >>>> Essentially, when closing StateStores, we force a memtable flush, then > >>>> call getCommittedOffsets and write those out to the .checkpoint file. > >>>> That would ensure the metadata is available to the > >>>> StreamsPartitionAssignor for all closed stores. > >>>> If there's a crash (no clean close), then we won't be able to > guarantee > >>>> which offsets were flushed to disk by RocksDB, so we'd need to open ( > >>>> init()), read offsets, and then close() those stores. But since this > is > >>>> the exception, and will only occur once (provided it doesn't crash > every > >>>> time!), I think the performance impact here would be acceptable. > >>>> > >>>> Thanks for the feedback, please let me know if you have any more > >> comments > >>>> or questions! > >>>> > >>>> I'm currently working on rebasing against trunk. This involves adding > >>>> support for transactionality to VersionedStateStores. I will probably > >> need > >>>> to revise my implementation for transactional "segmented" stores, both > >> to > >>>> accommodate VersionedStateStore, and to clean up some other stuff. > >>>> > >>>> Regards, > >>>> Nick > >>>> > >>>> > >>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> > wrote: > >>>> > >>>>> Hi Nick, > >>>>> > >>>>> Thanks for the updates! > >>>>> > >>>>> I have a couple of questions/comments. > >>>>> > >>>>> 1. > >>>>> Why do you propose a configuration that involves max. bytes and max. > >>>>> reords? I think we are mainly concerned about memory consumption > >> because > >>>>> we want to limit the off-heap memory used. I cannot think of a case > >>>>> where one would want to set the max. number of records. > >>>>> > >>>>> > >>>>> 2. > >>>>> Why does > >>>>> > >>>>> default void commit(final Map<TopicPartition, Long> > >> changelogOffsets) { > >>>>> flush(); > >>>>> } > >>>>> > >>>>> take a map of partitions to changelog offsets? > >>>>> The mapping between state stores to partitions is a 1:1 relationship. > >>>>> Passing in a single changelog offset should suffice. > >>>>> > >>>>> > >>>>> 3. > >>>>> Why do we need the Transaction interface? It should be possible to > hide > >>>>> beginning and committing a transactions withing the state store > >>>>> implementation, so that from outside the state store, it does not > >> matter > >>>>> whether the state store is transactional or not. What would be the > >>>>> advantage of using the Transaction interface? > >>>>> > >>>>> > >>>>> 4. > >>>>> Regarding checkpointing offsets, I think we should keep the > checkpoint > >>>>> file in any case for the reason you mentioned about rebalancing. Even > >> if > >>>>> that would not be an issue, I would propose to move the change to > >> offset > >>>>> management to a new KIP and to not add more complexity than needed to > >>>>> this one. I would not be too concerned about the consistency > violation > >>>>> you mention. As far as I understand, with transactional state stores > >>>>> Streams would write the checkpoint file during every commit even > under > >>>>> EOS. In the failure case you describe, Streams would restore the > state > >>>>> stores from the offsets found in the checkpoint file written during > the > >>>>> penultimate commit instead of during the last commit. Basically, > >> Streams > >>>>> would overwrite the records written to the state store between the > last > >>>>> two commits with the same records read from the changelogs. While I > >>>>> understand that this is wasteful, it is -- at the same time -- > >>>>> acceptable and most importantly it does not break EOS. > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> > >>>>> On 27.04.23 12:34, Nick Telford wrote: > >>>>>> Hi everyone, > >>>>>> > >>>>>> I find myself (again) considering removing the offset management > from > >>>>>> StateStores, and keeping the old checkpoint file system. The reason > is > >>>>> that > >>>>>> the StreamPartitionAssignor directly reads checkpoint files in order > >> to > >>>>>> determine which instance has the most up-to-date copy of the local > >>>>> state. > >>>>>> If we move offsets into the StateStore itself, then we will need to > >>>>> open, > >>>>>> initialize, read offsets and then close each StateStore (that is not > >>>>>> already assigned and open) for which we have *any* local state, on > >> every > >>>>>> rebalance. > >>>>>> > >>>>>> Generally, I don't think there are many "orphan" stores like this > >>>>> sitting > >>>>>> around on most instances, but even a few would introduce additional > >>>>> latency > >>>>>> to an already somewhat lengthy rebalance procedure. > >>>>>> > >>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping things > >> in > >>>>> the > >>>>>> checkpoint file(s) for now, and not worrying about the race. The > >>>>> downside > >>>>>> is that we wouldn't be able to remove the explicit RocksDB flush > >>>>> on-commit, > >>>>>> which likely hurts performance. > >>>>>> > >>>>>> If anyone has any thoughts or ideas on this subject, I would > >> appreciate > >>>>> it! > >>>>>> > >>>>>> Regards, > >>>>>> Nick > >>>>>> > >>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Colt, > >>>>>>> > >>>>>>> The issue is that if there's a crash between 2 and 3, then you > still > >>>>> end > >>>>>>> up with inconsistent data in RocksDB. The only way to guarantee > that > >>>>> your > >>>>>>> checkpoint offsets and locally stored data are consistent with each > >>>>> other > >>>>>>> are to atomically commit them, which can be achieved by having the > >>>>> offsets > >>>>>>> stored in RocksDB. > >>>>>>> > >>>>>>> The offsets column family is likely to be extremely small (one > >>>>>>> per-changelog partition + one per Topology input partition for > >> regular > >>>>>>> stores, one per input partition for global stores). So the overhead > >>>>> will be > >>>>>>> minimal. > >>>>>>> > >>>>>>> A major benefit of doing this is that we can remove the explicit > >> calls > >>>>> to > >>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit. It > >>>>> turns > >>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka Streams > >>>>>>> commits, *not* RocksDB configuration, which could be a major source > >> of > >>>>>>> confusion. Atomic checkpointing makes it safe to remove these > >> explicit > >>>>>>> flushes, because it no longer matters exactly when RocksDB flushes > >>>>> data to > >>>>>>> disk; since the data and corresponding checkpoint offsets will > always > >>>>> be > >>>>>>> flushed together, the local store is always in a consistent state, > >> and > >>>>>>> on-restart, it can always safely resume restoration from the > on-disk > >>>>>>> offsets, restoring the small amount of data that hadn't been > flushed > >>>>> when > >>>>>>> the app exited/crashed. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Nick > >>>>>>> > >>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io> > >>>>> wrote: > >>>>>>> > >>>>>>>> Nick, > >>>>>>>> > >>>>>>>> Thanks for your reply. Ack to A) and B). > >>>>>>>> > >>>>>>>> For item C), I see what you're referring to. Your proposed > solution > >>>>> will > >>>>>>>> work, so no need to change it. What I was suggesting was that it > >>>>> might be > >>>>>>>> possible to achieve this with only one column family. So long as: > >>>>>>>> > >>>>>>>> - No uncommitted records (i.e. not committed to the > changelog) > >> are > >>>>>>>> *committed* to the state store, AND > >>>>>>>> - The Checkpoint offset (which refers to the changelog > topic) > >> is > >>>>> less > >>>>>>>> than or equal to the last written changelog offset in > rocksdb > >>>>>>>> > >>>>>>>> I don't see the need to do the full restoration from scratch. My > >>>>>>>> understanding was that prior to 844/892, full restorations were > >>>>> required > >>>>>>>> because there could be uncommitted records written to RocksDB; > >>>>> however, > >>>>>>>> given your use of RocksDB transactions, that can be avoided with > the > >>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB > >>>>> transaction, 3) > >>>>>>>> update offset in checkpoint file. > >>>>>>>> > >>>>>>>> Anyways, your proposed solution works equivalently and I don't > >> believe > >>>>>>>> there is much overhead to an additional column family in RocksDB. > >>>>> Perhaps > >>>>>>>> it may even perform better than making separate writes to the > >>>>> checkpoint > >>>>>>>> file. > >>>>>>>> > >>>>>>>> Colt McNealy > >>>>>>>> *Founder, LittleHorse.io* > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford < > >> nick.telf...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Colt, > >>>>>>>>> > >>>>>>>>> A. I've done my best to de-couple the StateStore stuff from the > >> rest > >>>>> of > >>>>>>>> the > >>>>>>>>> Streams engine. The fact that there will be only one ongoing > >> (write) > >>>>>>>>> transaction at a time is not guaranteed by any API, and is just a > >>>>>>>>> consequence of the way Streams operates. To that end, I tried to > >>>>> ensure > >>>>>>>> the > >>>>>>>>> documentation and guarantees provided by the new APIs are > >>>>> independent of > >>>>>>>>> this incidental behaviour. In practice, you're right, this > >>>>> essentially > >>>>>>>>> refers to "interactive queries", which are technically "read > >>>>>>>> transactions", > >>>>>>>>> even if they don't actually use the transaction API to isolate > >>>>>>>> themselves. > >>>>>>>>> > >>>>>>>>> B. Yes, although not ideal. This is for backwards compatibility, > >>>>>>>> because: > >>>>>>>>> 1) Existing custom StateStore implementations will > implement > >>>>>>>> flush(), > >>>>>>>>> and not commit(), but the Streams engine now calls commit(), so > >> those > >>>>>>>> calls > >>>>>>>>> need to be forwarded to flush() for these legacy stores. > >>>>>>>>> 2) Existing StateStore *users*, i.e. outside of the > Streams > >>>>> engine > >>>>>>>>> itself, may depend on explicitly calling flush(), so for these > >> cases, > >>>>>>>>> flush() needs to be redirected to call commit(). > >>>>>>>>> If anyone has a better way to guarantee compatibility without > >>>>>>>> introducing > >>>>>>>>> this potential recursion loop, I'm open to changes! > >>>>>>>>> > >>>>>>>>> C. This is described in the "Atomic Checkpointing" section. > Offsets > >>>>> are > >>>>>>>>> stored in a separate RocksDB column family, which is guaranteed > to > >> be > >>>>>>>>> atomically flushed to disk with all other column families. The > >> issue > >>>>> of > >>>>>>>>> checkpoints being written to disk after commit causing > >> inconsistency > >>>>> if > >>>>>>>> it > >>>>>>>>> crashes in between is the reason why, under EOS, checkpoint files > >> are > >>>>>>>> only > >>>>>>>>> written on clean shutdown. This is one of the major causes of > "full > >>>>>>>>> restorations", so moving the offsets into a place where they can > be > >>>>>>>>> guaranteed to be atomically written with the data they checkpoint > >>>>>>>> allows us > >>>>>>>>> to write the checkpoint offsets *on every commit*, not just on > >> clean > >>>>>>>>> shutdown. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io> > >>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Nick, > >>>>>>>>>> > >>>>>>>>>> Thank you for continuing this work. I have a few minor > clarifying > >>>>>>>>>> questions. > >>>>>>>>>> > >>>>>>>>>> A) "Records written to any transaction are visible to all other > >>>>>>>>>> transactions immediately." I am confused here—I thought there > >> could > >>>>>>>> only > >>>>>>>>> be > >>>>>>>>>> one transaction going on at a time for a given state store given > >> the > >>>>>>>>>> threading model for processing records on a Task. Do you mean > >>>>>>>> Interactive > >>>>>>>>>> Queries by "other transactions"? (If so, then everything makes > >>>>> sense—I > >>>>>>>>>> thought that since IQ were read-only then they didn't count as > >>>>>>>>>> transactions). > >>>>>>>>>> > >>>>>>>>>> B) Is it intentional that the default implementations of the > >> flush() > >>>>>>>> and > >>>>>>>>>> commit() methods in the StateStore class refer to each other in > >> some > >>>>>>>> sort > >>>>>>>>>> of unbounded recursion? > >>>>>>>>>> > >>>>>>>>>> C) How will the getCommittedOffset() method work? At first I > >> thought > >>>>>>>> the > >>>>>>>>>> way to do it would be using a special key in the RocksDB store > to > >>>>>>>> store > >>>>>>>>> the > >>>>>>>>>> offset, and committing that with the transaction. But upon > second > >>>>>>>>> thought, > >>>>>>>>>> since restoration from the changelog is an idempotent > procedure, I > >>>>>>>> think > >>>>>>>>> it > >>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then 2) > >> write > >>>>>>>> the > >>>>>>>>>> offset to disk in a checkpoint file. If there is a crash between > >> 1) > >>>>>>>> and > >>>>>>>>> 2), > >>>>>>>>>> I think the only downside is now we replay a few more records > (at > >> a > >>>>>>>> cost > >>>>>>>>> of > >>>>>>>>>> <100ms). Am I missing something there? > >>>>>>>>>> > >>>>>>>>>> Other than that, everything makes sense to me. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Colt McNealy > >>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < > >>>>> nick.telf...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi everyone, > >>>>>>>>>>> > >>>>>>>>>>> I've updated the KIP to reflect the latest version of the > design: > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>> > >>>>>>>>>>> There are several changes in there that reflect feedback from > >> this > >>>>>>>>>> thread, > >>>>>>>>>>> and there's a new section and a bunch of interface changes > >> relating > >>>>>>>> to > >>>>>>>>>>> Atomic Checkpointing, which is the final piece of the puzzle to > >>>>>>>> making > >>>>>>>>>>> everything robust. > >>>>>>>>>>> > >>>>>>>>>>> Let me know what you think! > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Nick > >>>>>>>>>>> > >>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford < > >> nick.telf...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Lucas, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for looking over my KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a typo in > >> the > >>>>>>>>> KIP > >>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I > >>>>>>>> changed it > >>>>>>>>>> to > >>>>>>>>>>>> per-instance for exactly the reason you highlighted. > >>>>>>>>>>>> B) It's worth noting that transactionality is only enabled > under > >>>>>>>> EOS, > >>>>>>>>>> and > >>>>>>>>>>>> in the default mode of operation (ALOS), there should be no > >>>>>>>> change in > >>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the > impact > >> on > >>>>>>>>>> users > >>>>>>>>>>> by > >>>>>>>>>>>> sufficiently low default values for the memory bound > >>>>>>>> configuration. I > >>>>>>>>>>>> understand your hesitation to include a significant change of > >>>>>>>>>> behaviour, > >>>>>>>>>>>> especially in a minor release, but I suspect that most users > >> will > >>>>>>>>>> prefer > >>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of > >>>>>>>> frequent > >>>>>>>>>> state > >>>>>>>>>>>> restorations! If this is a problem, the changes can wait until > >> the > >>>>>>>>> next > >>>>>>>>>>>> major release. I'll be running a patched version of streams in > >>>>>>>>>> production > >>>>>>>>>>>> with these changes as soon as they're ready, so it won't > disrupt > >>>>>>>> me > >>>>>>>>> :-D > >>>>>>>>>>>> C) The main purpose of this sentence was just to note that > some > >>>>>>>>> changes > >>>>>>>>>>>> will need to be made to the way Segments are handled in order > to > >>>>>>>>> ensure > >>>>>>>>>>>> they also benefit from transactions. At the time I wrote it, I > >>>>>>>> hadn't > >>>>>>>>>>>> figured out the specific changes necessary, so it was > >> deliberately > >>>>>>>>>> vague. > >>>>>>>>>>>> This is the one outstanding problem I'm currently working on, > >> and > >>>>>>>>> I'll > >>>>>>>>>>>> update this section with more detail once I have figured out > the > >>>>>>>>> exact > >>>>>>>>>>>> changes required. > >>>>>>>>>>>> D) newTransaction() provides the necessary isolation > guarantees. > >>>>>>>>> While > >>>>>>>>>>>> the RocksDB implementation of transactions doesn't technically > >>>>>>>> *need* > >>>>>>>>>>>> read-only users to call newTransaction(), other > implementations > >>>>>>>>> (e.g. a > >>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling > >>>>>>>> newTransaction() > >>>>>>>>>> when > >>>>>>>>>>>> no transaction is necessary is essentially free, as it will > just > >>>>>>>>> return > >>>>>>>>>>>> this. > >>>>>>>>>>>> > >>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it > >>>>>>>> should > >>>>>>>>> be > >>>>>>>>>>>> fairly obvious where the performance problems stem from: > writes > >>>>>>>> under > >>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with the > >>>>>>>>>>>> tombstone/record flag, 1 to decode it from the > tombstone/record > >>>>>>>> flag, > >>>>>>>>>>> and 1 > >>>>>>>>>>>> to copy the record from the "temporary" store to the "main" > >> store, > >>>>>>>>> when > >>>>>>>>>>> the > >>>>>>>>>>>> transaction commits. The different approach taken by KIP-869 > >>>>>>>> should > >>>>>>>>>>> perform > >>>>>>>>>>>> much better, as it avoids all these copies, and may actually > >>>>>>>> perform > >>>>>>>>>>>> slightly better than trunk, due to batched writes in RocksDB > >>>>>>>>> performing > >>>>>>>>>>>> better than non-batched writes.[1] > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Nick > >>>>>>>>>>>> > >>>>>>>>>>>> 1: > >>>>>>>>>>> > >>>>>>>>> > >>>>> > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < > >>>>>>>> lbruts...@confluent.io > >>>>>>>>>>> .invalid> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'm just starting to read up on the whole discussion about > >>>>>>>> KIP-892 > >>>>>>>>> and > >>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think > >>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have > some > >>>>>>>>>>>>> questions about the latest draft. > >>>>>>>>>>>>> > >>>>>>>>>>>>> A) If I understand correctly, you propose to put a bound > on > >> the > >>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder if > >> this > >>>>>>>> is > >>>>>>>>>>>>> sufficient if we have temporary imbalances in the cluster. > For > >>>>>>>>>>>>> example, depending on the timing of rebalances during a > cluster > >>>>>>>>>>>>> restart, it could happen that a single streams node is > >> assigned a > >>>>>>>>> lot > >>>>>>>>>>>>> more tasks than expected. With your proposed change, this > would > >>>>>>>> mean > >>>>>>>>>>>>> that the memory required by this one node could be a multiple > >> of > >>>>>>>>> what > >>>>>>>>>>>>> is required during normal operation. I wonder if it wouldn't > be > >>>>>>>>> safer > >>>>>>>>>>>>> to put a global bound on the memory use, across all tasks. > >>>>>>>>>>>>> B) Generally, the memory concerns still give me the > feeling > >>>>> that > >>>>>>>>> this > >>>>>>>>>>>>> should not be enabled by default for all users in a minor > >>>>>>>> release. > >>>>>>>>>>>>> C) In section "Transaction Management": the sentence "A > >> similar > >>>>>>>>>>>>> analogue will be created to automatically manage `Segment` > >>>>>>>>>>>>> transactions.". Maybe this is just me lacking some > background, > >>>>>>>> but I > >>>>>>>>>>>>> do not understand this, it would be great if you could > clarify > >>>>>>>> what > >>>>>>>>>>>>> you mean here. > >>>>>>>>>>>>> D) Could you please clarify why IQ has to call > >>>>> newTransaction(), > >>>>>>>>> when > >>>>>>>>>>>>> it's read-only. > >>>>>>>>>>>>> > >>>>>>>>>>>>> And one last thing not strictly related to your KIP: if there > >> is > >>>>>>>> an > >>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x > slower > >>>>>>>> (e.g. > >>>>>>>>>>>>> by providing a flame graph), that would be quite interesting. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> Lucas > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < > >>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which > >>>>>>>> reflects > >>>>>>>>> the > >>>>>>>>>>>>>> implementation I've been working on: > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This new design should address the outstanding points > already > >>>>>>>> made > >>>>>>>>>> in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> thread. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Please let me know if there are areas that are unclear or > need > >>>>>>>>> more > >>>>>>>>>>>>>> clarification. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident that > >>>>>>>> the > >>>>>>>>>>>>> remaining > >>>>>>>>>>>>>> work (making Segments behave) will not impact the documented > >>>>>>>>> design. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy < > >> c...@littlehorse.io > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping > that, > >>>>>>>>>> since > >>>>>>>>>>>>> reading > >>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the > >>>>>>>> documented > >>>>>>>>>> API, > >>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release to > >>>>>>>> make > >>>>>>>>>> that > >>>>>>>>>>>>> change; > >>>>>>>>>>>>>>> but given that it would be considered a major change, I > like > >>>>>>>>> your > >>>>>>>>>>>>> approach > >>>>>>>>>>>>>>> the best. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < > >>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended to > >>>>>>>> try > >>>>>>>>> to > >>>>>>>>>>>>> keep the > >>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd > >>>>>>>> have > >>>>>>>>> to > >>>>>>>>>>>>> wait for > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> major version release to land these changes. > >>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of > >>>>>>>> guarantee, > >>>>>>>>>> and > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> typically longer commit intervals would be problematic > when > >>>>>>>>>>> reading > >>>>>>>>>>>>> only > >>>>>>>>>>>>>>>> "committed" records. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor surgery, > >>>>>>>> but I > >>>>>>>>>>>>> spent a > >>>>>>>>>>>>>>>> considerable amount of that time working through ideas for > >>>>>>>>>>> possible > >>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of keeping > >>>>>>>> ALOS > >>>>>>>>>>>>> as-is, but > >>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards, > >>>>>>>> although > >>>>>>>>> I > >>>>>>>>>>>>> have a > >>>>>>>>>>>>>>>> solution that both expands on this, and provides for some > >>>>>>>> more > >>>>>>>>>>>>> formal > >>>>>>>>>>>>>>>> guarantees. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for > >>>>>>>>>> "Transactions", > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed" > >>>>>>>> when > >>>>>>>>>> under > >>>>>>>>>>>>> EOS, > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us > much > >>>>>>>>> more > >>>>>>>>>>>>> clearly > >>>>>>>>>>>>>>>> defined isolation behaviour that can be properly > >>>>>>>> documented to > >>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>> know what to expect. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and will > >>>>>>>> update > >>>>>>>>>> the > >>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>> I have something. The main struggle is trying to implement > >>>>>>>>> this > >>>>>>>>>>>>> without > >>>>>>>>>>>>>>>> making any major changes to the existing interfaces or > >>>>>>>>> breaking > >>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>> implementations, because currently everything expects to > >>>>>>>>> operate > >>>>>>>>>>>>> directly > >>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. I > >>>>>>>> think > >>>>>>>>>> I'm > >>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much > >>>>>>>> until > >>>>>>>>>> next > >>>>>>>>>>>>> week > >>>>>>>>>>>>>>> due > >>>>>>>>>>>>>>>> to some work commitments. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < > >>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the updated > >>>>>>>>> KIP. I > >>>>>>>>>>> am > >>>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>> eager for this improvement to be released as it would > >>>>>>>>> greatly > >>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Two questions: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 10) > >>>>>>>>>>>>>>>>>> When reading records, we will use the > >>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB > >>>>>>>>>>>>>>>>> and WriteBatchWithIndex#newIteratorWithBase > utilities in > >>>>>>>>>> order > >>>>>>>>>>> to > >>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>> that uncommitted writes are available to query. > >>>>>>>>>>>>>>>>> Why do extra work to enable the reading of uncommitted > >>>>>>>>> writes > >>>>>>>>>>>>> during > >>>>>>>>>>>>>>> IQ? > >>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, in > >>>>>>>> my > >>>>>>>>>>>>> opinion, a > >>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the > >>>>>>>>>>> guarantee > >>>>>>>>>>>>> that, > >>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order to > >>>>>>>> avoid > >>>>>>>>>>> dirty > >>>>>>>>>>>>>>> reads, > >>>>>>>>>>>>>>>>> one currently must query a standby replica (but this > >>>>>>>> still > >>>>>>>>>>> doesn't > >>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>> guarantee monotonic reads). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization on > >>>>>>>> ALOS > >>>>>>>>>>>>> stores? > >>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to > >>>>>>>>> restore > >>>>>>>>>>>>> state > >>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was > >>>>>>>>> acceptable > >>>>>>>>>> to > >>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk. On a > >>>>>>>>> side > >>>>>>>>>>>>> note, if > >>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the > >>>>>>>> community > >>>>>>>>>>> would > >>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads; > >>>>>>>> otherwise > >>>>>>>>>>> users > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thank you for doing this fantastic work! > >>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < > >>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no > >>>>>>>>> longer > >>>>>>>>>>>>> include > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be > >>>>>>>> added > >>>>>>>>>> as a > >>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>> to further optimize the consistency and performance of > >>>>>>>>> state > >>>>>>>>>>>>> stores. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I've also added a section discussing some of the > >>>>>>>> concerns > >>>>>>>>>>> around > >>>>>>>>>>>>>>>>>> concurrency, especially in the presence of Iterators. > >>>>>>>> I'm > >>>>>>>>>>>>> thinking of > >>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting > >>>>>>>>>>>>> copy-on-write > >>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an > >>>>>>>>> active > >>>>>>>>>>>>>>> iterator), > >>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>> I'm open to suggestions. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford < > >>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844 > >>>>>>>> implementation: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> - Writes uncommitted records to a temporary > >>>>>>>> RocksDB > >>>>>>>>>>>>> instance > >>>>>>>>>>>>>>>>>>> - Since tombstones need to be flagged, all > >>>>>>>> record > >>>>>>>>>>>>> values are > >>>>>>>>>>>>>>>>>>> prefixed with a value/tombstone marker. This > >>>>>>>>>>>>> necessitates a > >>>>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>>>> - On-commit, iterates all records in this > >>>>>>>> temporary > >>>>>>>>>>>>> instance and > >>>>>>>>>>>>>>>>>>> writes them to the main RocksDB store. > >>>>>>>>>>>>>>>>>>> - While iterating, the value/tombstone marker > >>>>>>>> needs > >>>>>>>>> to > >>>>>>>>>> be > >>>>>>>>>>>>> parsed > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> the real value extracted. This necessitates > >>>>>>>> another > >>>>>>>>>>> memory > >>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> My guess is that the cost of iterating the temporary > >>>>>>>>>> RocksDB > >>>>>>>>>>>>> store > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> major factor, with the 2 extra memory copies > >>>>>>>> per-Record > >>>>>>>>>>>>>>> contributing > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> significant amount too. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy < > >>>>>>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Out of curiosity, why does the performance of the > >>>>>>>> store > >>>>>>>>>>>>> degrade so > >>>>>>>>>>>>>>>>>>>> significantly with the 844 implementation? I > >>>>>>>> wouldn't > >>>>>>>>> be > >>>>>>>>>>> too > >>>>>>>>>>>>>>>> surprised > >>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> 50-60% drop (caused by each record being written > >>>>>>>>> twice), > >>>>>>>>>>> but > >>>>>>>>>>>>> 96% > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> extreme. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The only thing I can think of which could create > >>>>>>>> such a > >>>>>>>>>>>>> bottleneck > >>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and > >>>>>>>>> then > >>>>>>>>>>>>>>>>> re-serializes > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> store values when copying from the uncommitted to > >>>>>>>>>> committed > >>>>>>>>>>>>> store, > >>>>>>>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the > >>>>>>>> PR. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford < > >>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points > >>>>>>>> that > >>>>>>>>>> have > >>>>>>>>>>>>> been > >>>>>>>>>>>>>>>> raised > >>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit > >>>>>>>>>> interval > >>>>>>>>>>>>> of 5 > >>>>>>>>>>>>>>>>> minutes > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow > >>>>>>>>> too > >>>>>>>>>>>>> large. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> There's a couple of different things I can think > >>>>>>>> of > >>>>>>>>> to > >>>>>>>>>>>>> solve > >>>>>>>>>>>>>>> this: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> - We already have a memory/record limit in the > >>>>>>>> KIP > >>>>>>>>>> to > >>>>>>>>>>>>> prevent > >>>>>>>>>>>>>>>> OOM > >>>>>>>>>>>>>>>>>>>>> errors. Should we choose a default value for > >>>>>>>>> these? > >>>>>>>>>> My > >>>>>>>>>>>>>>> concern > >>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>> anything we choose might seem rather > >>>>>>>> arbitrary. We > >>>>>>>>>>> could > >>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>> its behaviour such that under ALOS, it only > >>>>>>>>> triggers > >>>>>>>>>>> the > >>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> StateStore, but under EOS, it triggers a > >>>>>>>> commit of > >>>>>>>>>> the > >>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>>>>> transaction. > >>>>>>>>>>>>>>>>>>>>> - We could introduce a separate ` > >>>>>>>>>>> checkpoint.interval.ms` > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>>>>> to commit the StateStores more frequently than > >>>>>>>> the > >>>>>>>>>>>>> general > >>>>>>>>>>>>>>>>>>>>> commit.interval.ms? My concern here is that > >>>>>>>> the > >>>>>>>>>>>>> semantics of > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>>>> would depend on the processing.mode; under > >>>>>>>> ALOS it > >>>>>>>>>>> would > >>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>> frequently committing stores, whereas under > >>>>>>>> EOS it > >>>>>>>>>>>>> couldn't. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Any better ideas? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 16:25, Nick Telford < > >>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Alex, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I've updated the discussion of OOM issues by > >>>>>>>>>> describing > >>>>>>>>>>>>> how > >>>>>>>>>>>>>>>> we'll > >>>>>>>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>>>>>>> it. Here's the new text: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> To mitigate this, we will automatically force a > >>>>>>>>> Task > >>>>>>>>>>>>> commit if > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> total > >>>>>>>>>>>>>>>>>>>>>>> uncommitted records returned by > >>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedEntries() > >>>>>>>>>>> exceeds a > >>>>>>>>>>>>>>>>> threshold, > >>>>>>>>>>>>>>>>>>>>>>> configured by > >>>>>>>>>> max.uncommitted.state.entries.per.task; > >>>>>>>>>>>>> or the > >>>>>>>>>>>>>>>>> total > >>>>>>>>>>>>>>>>>>>>>>> memory used for buffering uncommitted records > >>>>>>>>>> returned > >>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() > >>>>>>>>> exceeds > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> threshold > >>>>>>>>>>>>>>>>>>>>>>> configured by > >>>>>>>>> max.uncommitted.state.bytes.per.task. > >>>>>>>>>>>>> This will > >>>>>>>>>>>>>>>>>> roughly > >>>>>>>>>>>>>>>>>>>>>>> bound the memory required per-Task for > >>>>>>>> buffering > >>>>>>>>>>>>> uncommitted > >>>>>>>>>>>>>>>>>> records, > >>>>>>>>>>>>>>>>>>>>>>> irrespective of the commit.interval.ms, and > >>>>>>>> will > >>>>>>>>>>>>> effectively > >>>>>>>>>>>>>>>>> bound > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> number of records that will need to be > >>>>>>>> restored in > >>>>>>>>>> the > >>>>>>>>>>>>> event > >>>>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>>>>>>>> failure. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> These limits will be checked in > >>>>>>>> StreamTask#process > >>>>>>>>>> and > >>>>>>>>>>> a > >>>>>>>>>>>>>>>> premature > >>>>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>>>> will be requested via Task#requestCommit(). > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Note that these new methods provide default > >>>>>>>>>>>>> implementations > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>>>>>> existing custom stores and non-transactional > >>>>>>>>> stores > >>>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>>>>>>>> InMemoryKeyValueStore) do not force any early > >>>>>>>>>> commits. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I've chosen to have the StateStore expose > >>>>>>>>>>> approximations > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>> buffer > >>>>>>>>>>>>>>>>>>>>>> size/count instead of opaquely requesting a > >>>>>>>> commit > >>>>>>>>> in > >>>>>>>>>>>>> order to > >>>>>>>>>>>>>>>>>>>> delegate > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> decision making to the Task itself. This enables > >>>>>>>>>> Tasks > >>>>>>>>>>>>> to look > >>>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>>>>>> *all* > >>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> their StateStores, and determine whether an > >>>>>>>> early > >>>>>>>>>>> commit > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> necessary. > >>>>>>>>>>>>>>>>>>>>>> Notably, it enables pre-Task thresholds, > >>>>>>>> instead of > >>>>>>>>>>>>> per-Store, > >>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> prevents Tasks with many StateStores from using > >>>>>>>>> much > >>>>>>>>>>> more > >>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>> Tasks > >>>>>>>>>>>>>>>>>>>>>> with one StateStore. This makes sense, since > >>>>>>>>> commits > >>>>>>>>>>> are > >>>>>>>>>>>>> done > >>>>>>>>>>>>>>>>>> by-Task, > >>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>> by-Store. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Prizes* for anyone who can come up with a better > >>>>>>>>> name > >>>>>>>>>>>>> for the > >>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>> config > >>>>>>>>>>>>>>>>>>>>>> properties! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Thanks for pointing out the potential > >>>>>>>> performance > >>>>>>>>>>> issues > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> WBWI. > >>>>>>>>>>>>>>>>>> From > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> benchmarks that user posted[1], it looks like > >>>>>>>> WBWI > >>>>>>>>>>> still > >>>>>>>>>>>>>>>> performs > >>>>>>>>>>>>>>>>>>>>>> considerably better than individual puts, which > >>>>>>>> is > >>>>>>>>>> the > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>> design, > >>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>> I'd actually expect a performance boost from > >>>>>>>> WBWI, > >>>>>>>>>> just > >>>>>>>>>>>>> not as > >>>>>>>>>>>>>>>>> great > >>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>> we'd get from a plain WriteBatch. This does > >>>>>>>> suggest > >>>>>>>>>>> that > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>> optimization would be to use a regular > >>>>>>>> WriteBatch > >>>>>>>>> for > >>>>>>>>>>>>>>>> restoration > >>>>>>>>>>>>>>>>>> (in > >>>>>>>>>>>>>>>>>>>>>> RocksDBStore#restoreBatch), since we know that > >>>>>>>>> those > >>>>>>>>>>>>> records > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> never > >>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> queried before they're committed. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 1: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>> > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> * Just kidding, no prizes, sadly. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Wed, 23 Nov 2022 at 12:28, Alexander > >>>>>>>> Sorokoumov > >>>>>>>>>>>>>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hey Nick, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thank you for the KIP! With such a significant > >>>>>>>>>>>>> performance > >>>>>>>>>>>>>>>>>>>> degradation > >>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> the secondary store approach, we should > >>>>>>>> definitely > >>>>>>>>>>>>> consider > >>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex. I also like encapsulating > >>>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>>>> inside > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> default state store implementation to improve > >>>>>>>>>>>>> performance. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> +1 to John's comment to keep the current > >>>>>>>>>> checkpointing > >>>>>>>>>>>>> as a > >>>>>>>>>>>>>>>>>> fallback > >>>>>>>>>>>>>>>>>>>>>>> mechanism. We want to keep existing users' > >>>>>>>>> workflows > >>>>>>>>>>>>> intact > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> can. A > >>>>>>>>>>>>>>>>>>>>>>> non-intrusive way would be to add a separate > >>>>>>>>>>> StateStore > >>>>>>>>>>>>>>> method, > >>>>>>>>>>>>>>>>>> say, > >>>>>>>>>>>>>>>>>>>>>>> StateStore#managesCheckpointing(), that > >>>>>>>> controls > >>>>>>>>>>>>> whether the > >>>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>>>>> implementation owns checkpointing. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I think that a solution to the transactional > >>>>>>>>> writes > >>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> OOMEs. One possible way to address that is to > >>>>>>>> wire > >>>>>>>>>>>>>>> StateStore's > >>>>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>>>> request by adding, say, StateStore#commitNeeded > >>>>>>>>> that > >>>>>>>>>>> is > >>>>>>>>>>>>>>> checked > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>> StreamTask#commitNeeded via the corresponding > >>>>>>>>>>>>>>>>>> ProcessorStateManager. > >>>>>>>>>>>>>>>>>>>>> With > >>>>>>>>>>>>>>>>>>>>>>> that change, RocksDBStore will have to track > >>>>>>>> the > >>>>>>>>>>> current > >>>>>>>>>>>>>>>>>> transaction > >>>>>>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>>>>>>>> and request a commit when the size goes over a > >>>>>>>>>>>>> (configurable) > >>>>>>>>>>>>>>>>>>>> threshold. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> AFAIU WriteBatchWithIndex might perform > >>>>>>>>>> significantly > >>>>>>>>>>>>> slower > >>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>> non-txn > >>>>>>>>>>>>>>>>>>>>>>> puts as the batch size grows [1]. We should > >>>>>>>> have a > >>>>>>>>>>>>>>>> configuration > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> fall > >>>>>>>>>>>>>>>>>>>>>>> back to the current behavior (and/or disable > >>>>>>>> txn > >>>>>>>>>>> stores > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>> ALOS) > >>>>>>>>>>>>>>>>>>>> unless > >>>>>>>>>>>>>>>>>>>>>>> the benchmarks show negligible overhead for > >>>>>>>> longer > >>>>>>>>>>>>> commits / > >>>>>>>>>>>>>>>>>>>>> large-enough > >>>>>>>>>>>>>>>>>>>>>>> batch sizes. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> If you prefer to keep the KIP smaller, I would > >>>>>>>>>> rather > >>>>>>>>>>>>> cut out > >>>>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing rather than > >>>>>>>>> proper > >>>>>>>>>>>>> OOMe > >>>>>>>>>>>>>>>>> handling > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>> being able to switch to non-txn behavior. The > >>>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>> necessary to solve the recovery-under-EOS > >>>>>>>> problem. > >>>>>>>>>> On > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> other > >>>>>>>>>>>>>>>>>> hand, > >>>>>>>>>>>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex is in, it will be much > >>>>>>>> easier > >>>>>>>>> to > >>>>>>>>>>> add > >>>>>>>>>>>>>>>>>>>>>>> state-store-managed checkpointing. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> If you share the current implementation, I am > >>>>>>>>> happy > >>>>>>>>>> to > >>>>>>>>>>>>> help > >>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> OOMe and configuration parts as well as review > >>>>>>>> and > >>>>>>>>>>> test > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> patch. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> Alex > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 1. > >>>>>>>> https://github.com/facebook/rocksdb/issues/608 > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford < > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi John, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the review and feedback! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 1. Custom Stores: I've been mulling over this > >>>>>>>>>>> problem > >>>>>>>>>>>>>>> myself. > >>>>>>>>>>>>>>>>> As > >>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> stands, > >>>>>>>>>>>>>>>>>>>>>>>> custom stores would essentially lose > >>>>>>>>> checkpointing > >>>>>>>>>>>>> with no > >>>>>>>>>>>>>>>>>>>> indication > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> they're expected to make changes, besides a > >>>>>>>> line > >>>>>>>>>> in > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> release > >>>>>>>>>>>>>>>>>>>>> notes. I > >>>>>>>>>>>>>>>>>>>>>>>> agree that the best solution would be to > >>>>>>>>> provide a > >>>>>>>>>>>>> default > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> checkpoints > >>>>>>>>>>>>>>>>>>>>>>>> to a file. The one thing I would change is > >>>>>>>> that > >>>>>>>>>> the > >>>>>>>>>>>>>>>>> checkpointing > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> store-local file, instead of a per-Task file. > >>>>>>>>> This > >>>>>>>>>>>>> way the > >>>>>>>>>>>>>>>>>>>> StateStore > >>>>>>>>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>>>>>> technically owns its own checkpointing (via a > >>>>>>>>>>> default > >>>>>>>>>>>>>>>>>>>> implementation), > >>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> the StateManager/Task execution engine > >>>>>>>> doesn't > >>>>>>>>>> need > >>>>>>>>>>>>> to know > >>>>>>>>>>>>>>>>>>>> anything > >>>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>> checkpointing, which greatly simplifies some > >>>>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>> logic. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 2. OOME errors: The main reasons why I didn't > >>>>>>>>>>> explore > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> solution > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>> this is > >>>>>>>>>>>>>>>>>>>>>>>> a) to keep this KIP as simple as possible, > >>>>>>>> and > >>>>>>>>> b) > >>>>>>>>>>>>> because > >>>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>> exactly > >>>>>>>>>>>>>>>>>>>>>>>> how to signal that a Task should commit > >>>>>>>>>> prematurely. > >>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>>> confident > >>>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>>> possible, and I think it's worth adding a > >>>>>>>>> section > >>>>>>>>>> on > >>>>>>>>>>>>>>> handling > >>>>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>>>>>>>>> Besides > >>>>>>>>>>>>>>>>>>>>>>>> my proposal to force an early commit once > >>>>>>>> memory > >>>>>>>>>>> usage > >>>>>>>>>>>>>>>> reaches > >>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>> threshold, > >>>>>>>>>>>>>>>>>>>>>>>> is there any other approach that you might > >>>>>>>>> suggest > >>>>>>>>>>> for > >>>>>>>>>>>>>>>> tackling > >>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> problem? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 3. ALOS: I can add in an explicit paragraph, > >>>>>>>> but > >>>>>>>>>> my > >>>>>>>>>>>>>>>> assumption > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> since transactional behaviour comes at > >>>>>>>> little/no > >>>>>>>>>>>>> cost, that > >>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> available by default on all stores, > >>>>>>>> irrespective > >>>>>>>>>> of > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>>>>> mode. > >>>>>>>>>>>>>>>>>>>>>>>> While ALOS doesn't use transactions, the Task > >>>>>>>>>> itself > >>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>> "commits", > >>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>> the behaviour should be correct under ALOS > >>>>>>>> too. > >>>>>>>>>> I'm > >>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>> convinced > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>>> worth having both > >>>>>>>>> transactional/non-transactional > >>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>>>> available, as > >>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>> would considerably increase the complexity of > >>>>>>>>> the > >>>>>>>>>>>>> codebase, > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>> little > >>>>>>>>>>>>>>>>>>>>>>>> benefit. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> 4. Method deprecation: Are you referring to > >>>>>>>>>>>>>>>>>>>> StateStore#getPosition()? > >>>>>>>>>>>>>>>>>>>>>>> As I > >>>>>>>>>>>>>>>>>>>>>>>> understand it, Position contains the > >>>>>>>> position of > >>>>>>>>>> the > >>>>>>>>>>>>>>> *source* > >>>>>>>>>>>>>>>>>>>> topics, > >>>>>>>>>>>>>>>>>>>>>>>> whereas the commit offsets would be the > >>>>>>>>>> *changelog* > >>>>>>>>>>>>>>> offsets. > >>>>>>>>>>>>>>>> So > >>>>>>>>>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>>>>>>>> necessary to retain the Position data, as > >>>>>>>> well > >>>>>>>>> as > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>>>>> offsets. > >>>>>>>>>>>>>>>>>>>>>>>> What I meant in the KIP is that Position > >>>>>>>> offsets > >>>>>>>>>> are > >>>>>>>>>>>>>>>> currently > >>>>>>>>>>>>>>>>>>>> stored > >>>>>>>>>>>>>>>>>>>>>>> in a > >>>>>>>>>>>>>>>>>>>>>>>> file, and since we can atomically store > >>>>>>>> metadata > >>>>>>>>>>>>> along with > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> record > >>>>>>>>>>>>>>>>>>>>>>>> batch we commit to RocksDB, we can move our > >>>>>>>>>> Position > >>>>>>>>>>>>>>> offsets > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> metadata too, and gain the same transactional > >>>>>>>>>>>>> guarantees > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> changelog offsets, ensuring that the Position > >>>>>>>>>>> offsets > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> consistent > >>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> the records that are read from the database. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Tue, 22 Nov 2022 at 16:25, John Roesler < > >>>>>>>>>>>>>>>>> vvcep...@apache.org> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for publishing this alternative, > >>>>>>>> Nick! > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> The benchmark you mentioned in the KIP-844 > >>>>>>>>>>>>> discussion > >>>>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>>> compelling reason to revisit the built-in > >>>>>>>>>>>>>>> transactionality > >>>>>>>>>>>>>>>>>>>>> mechanism. > >>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>> also appreciate you analysis, showing that > >>>>>>>> for > >>>>>>>>>>> most > >>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>> cases, > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>>>>>>>>> batch approach should be just fine. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> There are a couple of points that would > >>>>>>>> hold > >>>>>>>>> me > >>>>>>>>>>>>> back from > >>>>>>>>>>>>>>>>>>>> approving > >>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>> KIP right now: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 1. Loss of coverage for custom stores. > >>>>>>>>>>>>>>>>>>>>>>>>> The fact that you can plug in a > >>>>>>>> (relatively) > >>>>>>>>>>> simple > >>>>>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> XStateStore interfaces and automagically > >>>>>>>> get a > >>>>>>>>>>>>>>> distributed > >>>>>>>>>>>>>>>>>>>> database > >>>>>>>>>>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>> it is a significant benefit of Kafka > >>>>>>>> Streams. > >>>>>>>>>> I'd > >>>>>>>>>>>>> hate to > >>>>>>>>>>>>>>>>> lose > >>>>>>>>>>>>>>>>>>>> it, > >>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>> would be better to spend some time and > >>>>>>>> come up > >>>>>>>>>>> with > >>>>>>>>>>>>> a way > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> preserve > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>> property. For example, can we provide a > >>>>>>>>> default > >>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>> `commit(..)` that re-implements the > >>>>>>>> existing > >>>>>>>>>>>>>>>> checkpoint-file > >>>>>>>>>>>>>>>>>>>>>>> approach? Or > >>>>>>>>>>>>>>>>>>>>>>>>> perhaps add an `isTransactional()` flag to > >>>>>>>> the > >>>>>>>>>>> state > >>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>> interface > >>>>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>>> that the runtime can decide whether to > >>>>>>>>> continue > >>>>>>>>>> to > >>>>>>>>>>>>> manage > >>>>>>>>>>>>>>>>>>>> checkpoint > >>>>>>>>>>>>>>>>>>>>>>>> files > >>>>>>>>>>>>>>>>>>>>>>>>> vs delegating transactionality to the > >>>>>>>> stores? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 2. Guarding against OOME > >>>>>>>>>>>>>>>>>>>>>>>>> I appreciate your analysis, but I don't > >>>>>>>> think > >>>>>>>>>> it's > >>>>>>>>>>>>>>>> sufficient > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> say > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>> we will solve the memory problem later if > >>>>>>>> it > >>>>>>>>>>> becomes > >>>>>>>>>>>>>>>>> necessary. > >>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>> experience leading to that situation would > >>>>>>>> be > >>>>>>>>>>> quite > >>>>>>>>>>>>> bad: > >>>>>>>>>>>>>>>>>> Imagine, > >>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>> upgrade to AK 3.next, your tests pass, so > >>>>>>>> you > >>>>>>>>>>>>> deploy to > >>>>>>>>>>>>>>>>>>>> production. > >>>>>>>>>>>>>>>>>>>>>>> That > >>>>>>>>>>>>>>>>>>>>>>>>> night, you get paged because your app is > >>>>>>>> now > >>>>>>>>>>>>> crashing > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> OOMEs. As > >>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>> all OOMEs, you'll have a really hard time > >>>>>>>>>> finding > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> root > >>>>>>>>>>>>>>>>>> cause, > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> once > >>>>>>>>>>>>>>>>>>>>>>>>> you do, you won't have a clear path to > >>>>>>>> resolve > >>>>>>>>>> the > >>>>>>>>>>>>> issue. > >>>>>>>>>>>>>>>> You > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>>>> tune down the commit interval and cache > >>>>>>>> buffer > >>>>>>>>>>> size > >>>>>>>>>>>>> until > >>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> stop > >>>>>>>>>>>>>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>>>>>>>>>>> crashes. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> FYI, I know of multiple cases where people > >>>>>>>> run > >>>>>>>>>> EOS > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> much > >>>>>>>>>>>>>>>>>>>> larger > >>>>>>>>>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>>>>>>>> intervals to get better batching than the > >>>>>>>>>> default, > >>>>>>>>>>>>> so I > >>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>> pathological case would be as rare as you > >>>>>>>>>> suspect. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Given that we already have the rudiments > >>>>>>>> of an > >>>>>>>>>>> idea > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>> prevent this downside, we should take the > >>>>>>>> time > >>>>>>>>>> to > >>>>>>>>>>>>> design > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> solution. > >>>>>>>>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>>>> owe > >>>>>>>>>>>>>>>>>>>>>>>>> it to our users to ensure that awesome new > >>>>>>>>>>> features > >>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>> come > >>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> bitter > >>>>>>>>>>>>>>>>>>>>>>>>> pills unless we can't avoid it. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 3. ALOS mode. > >>>>>>>>>>>>>>>>>>>>>>>>> On the other hand, I didn't see an > >>>>>>>> indication > >>>>>>>>> of > >>>>>>>>>>> how > >>>>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> handled under ALOS (aka non-EOS) mode. > >>>>>>>>>>>>> Theoretically, the > >>>>>>>>>>>>>>>>>>>>>>>> transactionality > >>>>>>>>>>>>>>>>>>>>>>>>> of the store and the processing mode are > >>>>>>>>>>>>> orthogonal. A > >>>>>>>>>>>>>>>>>>>> transactional > >>>>>>>>>>>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>>>>>>>>>> would serve ALOS just as well as a > >>>>>>>>>>>>> non-transactional one > >>>>>>>>>>>>>>>> (if > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>> better). > >>>>>>>>>>>>>>>>>>>>>>>>> Under ALOS, though, the default commit > >>>>>>>>> interval > >>>>>>>>>> is > >>>>>>>>>>>>> five > >>>>>>>>>>>>>>>>>> minutes, > >>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> memory issue is far more pressing. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> As I see it, we have several options to > >>>>>>>>> resolve > >>>>>>>>>>> this > >>>>>>>>>>>>>>> point. > >>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>> demonstrate that transactional stores work > >>>>>>>>> just > >>>>>>>>>>>>> fine for > >>>>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>> therefore just swap over unconditionally. > >>>>>>>> We > >>>>>>>>>> could > >>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>> disable > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> transactional mechanism under ALOS so that > >>>>>>>>>> stores > >>>>>>>>>>>>> operate > >>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>> they do today when run in ALOS mode. > >>>>>>>> Finally, > >>>>>>>>> we > >>>>>>>>>>>>> could do > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> KIP-844 and make transactional stores > >>>>>>>> opt-in > >>>>>>>>>> (it'd > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> avoid > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> extra opt-in mechanism, but it's a good > >>>>>>>>>>>>>>>> get-out-of-jail-free > >>>>>>>>>>>>>>>>>>>> card). > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> 4. (minor point) Deprecation of methods > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> You mentioned that the new `commit` method > >>>>>>>>>>> replaces > >>>>>>>>>>>>>>> flush, > >>>>>>>>>>>>>>>>>>>>>>>>> updateChangelogOffsets, and checkpoint. It > >>>>>>>>> seems > >>>>>>>>>>> to > >>>>>>>>>>>>> me > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> point > >>>>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>> atomicity and Position also suggests that > >>>>>>>> it > >>>>>>>>>>>>> replaces the > >>>>>>>>>>>>>>>>>>>> Position > >>>>>>>>>>>>>>>>>>>>>>>>> callbacks. However, the proposal only > >>>>>>>>> deprecates > >>>>>>>>>>>>> `flush`. > >>>>>>>>>>>>>>>>>> Should > >>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>> deprecating other methods as well? > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for the KIP! It's really nice > >>>>>>>>> that > >>>>>>>>>>> you > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> Alex > >>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> chance to collaborate on both directions so > >>>>>>>>> that > >>>>>>>>>>> we > >>>>>>>>>>>>> can > >>>>>>>>>>>>>>> get > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> best > >>>>>>>>>>>>>>>>>>>>>>>>> outcome for Streams and its users. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On 2022/11/21 15:02:15 Nick Telford wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> As I mentioned in the discussion thread > >>>>>>>> for > >>>>>>>>>>>>> KIP-844, > >>>>>>>>>>>>>>> I've > >>>>>>>>>>>>>>>>>> been > >>>>>>>>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>> an alternative approach to achieving > >>>>>>>> better > >>>>>>>>>>>>>>> transactional > >>>>>>>>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>> Kafka Streams StateStores. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I've published this separately as > >>>>>>>> KIP-892: > >>>>>>>>>>>>>>> Transactional > >>>>>>>>>>>>>>>>>>>> Semantics > >>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>> StateStores > >>>>>>>>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>>>>>>>>>>>> , > >>>>>>>>>>>>>>>>>>>>>>>>>> so that it can be discussed/reviewed > >>>>>>>>>> separately > >>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>> KIP-844. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Alex: I'm especially interested in what > >>>>>>>> you > >>>>>>>>>>> think! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> I have a nearly complete implementation > >>>>>>>> of > >>>>>>>>> the > >>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>>>>>> outlined in > >>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>> KIP, please let me know if you'd like me > >>>>>>>> to > >>>>>>>>>> push > >>>>>>>>>>>>> them > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> review > >>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>> advance > >>>>>>>>>>>>>>>>>>>>>>>>>> of a vote. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >