> Note that users can disable the cache, which would still be ok, I think. We wouldn't ingest the SST files on every record, but just append to them and only ingest them on commit, when we're already waiting for acks and a RocksDB commit.
In this case, how would uncommitted records be read by joins? On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote: > Ah, sorry Nick, > > I just meant the regular heap based cache that we maintain in Streams. I > see that it's not called "RecordCache" (my mistake). > > The actual cache is ThreadCache: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java > > Here's the example of how we use the cache in KeyValueStore: > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java > > It's basically just an on-heap Map of records that have not yet been > written to the changelog or flushed into the underlying store. It gets > flushed when the total cache size exceeds `cache.max.bytes.buffering` or > the `commit.interval.ms` elapses. > > Speaking of those configs, another benefit to this idea is that we would > no longer need to trigger extra commits based on the size of the ongoing > transaction. Instead, we'd just preserve the existing cache-flush > behavior. Note that users can disable the cache, which would still be > ok, I think. We wouldn't ingest the SST files on every record, but just > append to them and only ingest them on commit, when we're already > waiting for acks and a RocksDB commit. > > Thanks, > -John > > On 6/20/23 14:09, Nick Telford wrote: > > Hi John, > > > > By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find any > > class called "RecordCache"... > > > > Cheers, > > > > Nick > > > > On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> wrote: > > > >> Hi Nick, > >> > >> Thanks for picking this up again! > >> > >> I did have one new thought over the intervening months, which I'd like > >> your take on. > >> > >> What if, instead of using the RocksDB atomic write primitive at all, we > >> instead just: > >> 1. disable memtables entirely > >> 2. directly write the RecordCache into SST files when we flush > >> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK > >> from the changelog (see > >> > >> > https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md > >> and > >> > >> > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java > >> and > >> > >> > https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429 > >> ) > >> 4. track the changelog offsets either in another CF or the same CF with > >> a reserved key, either of which will make the changelog offset update > >> atomic with the file ingestions > >> > >> I suspect this'll have a number of benefits: > >> * writes to RocksDB will always be atomic > >> * we don't fragment memory between the RecordCache and the memtables > >> * RecordCache gives far higher performance than memtable for reads and > >> writes > >> * we don't need any new "transaction" concepts or memory bound configs > >> > >> What do you think? > >> > >> Thanks, > >> -John > >> > >> On 6/20/23 10:51, Nick Telford wrote: > >>> Hi Bruno, > >>> > >>> Thanks for reviewing the KIP. It's been a long road, I started working > on > >>> this more than a year ago, and most of the time in the last 6 months > has > >>> been spent on the "Atomic Checkpointing" stuff that's been benched, so > >> some > >>> of the reasoning behind some of my decisions have been lost, but I'll > do > >> my > >>> best to reconstruct them. > >>> > >>> 1. > >>> IIRC, this was the initial approach I tried. I don't remember the exact > >>> reasons I changed it to use a separate "view" of the StateStore that > >>> encapsulates the transaction, but I believe it had something to do with > >>> concurrent access to the StateStore from Interactive Query threads. > Reads > >>> from interactive queries need to be isolated from the currently ongoing > >>> transaction, both for consistency (so interactive queries don't observe > >>> changes that are subsequently rolled-back), but also to prevent > Iterators > >>> opened by an interactive query from being closed and invalidated by the > >>> StreamThread when it commits the transaction, which causes your > >> interactive > >>> queries to crash. > >>> > >>> Another reason I believe I implemented it this way was a separation of > >>> concerns. Recall that newTransaction() originally created an object of > >> type > >>> Transaction, not StateStore. My intent was to improve the type-safety > of > >>> the API, in an effort to ensure Transactions weren't used incorrectly. > >>> Unfortunately, this didn't pan out, but newTransaction() remained. > >>> > >>> Finally, this had the added benefit that implementations could easily > add > >>> support for transactions *without* re-writing their existing, > >>> non-transactional implementation. I think this can be a benefit both > for > >>> implementers of custom StateStores, but also for anyone extending > >>> RocksDbStore, as they can rely on the existing access methods working > how > >>> they expect them to. > >>> > >>> I'm not too happy with the way the current design has panned out, so > I'm > >>> open to ideas on how to improve it. Key to this is finding some way to > >>> ensure that reads from Interactive Query threads are properly isolated > >> from > >>> the transaction, *without* the performance overhead of checking which > >>> thread the method is being called from on every access. > >>> > >>> As for replacing flush() with commit() - I saw no reason to add this > >>> complexity to the KIP, unless there was a need to add arguments to the > >>> flush/commit method. This need arises with Atomic Checkpointing, but > that > >>> will be implemented separately, in a future KIP. Do you see a need for > >> some > >>> arguments to the flush/commit method that I've missed? Or were you > simply > >>> suggesting a rename? > >>> > >>> 2. > >>> This is simply due to the practical reason that isolationLevel() is > >> really > >>> a proxy for checking if the app is under EOS. The application > >> configuration > >>> is not provided to the constructor of StateStores, but it *is* provided > >> to > >>> init(), via StateStoreContext. For this reason, it seemed somewhat > >> natural > >>> to add it to StateStoreContext. I think this makes sense, since the > >>> IsolationLevel of all StateStores in an application *must* be the same, > >> and > >>> since those stores are all initialized with the same StateStoreContext, > >> it > >>> seems natural for that context to carry the desired IsolationLevel to > >> use. > >>> > >>> 3. > >>> Using IsolationLevel instead of just passing `boolean eosEnabled`, like > >>> much of the internals was an attempt to logically de-couple the > >> StateStore > >>> API from the internals of Kafka Streams. Technically, StateStores don't > >>> need to know/care what processing mode the KS app is using, all they > need > >>> to know is the isolation level expected of them. > >>> > >>> Having formal definitions for the expectations of the two required > >>> IsolationLevels allow implementers to implement transactional stores > >>> without having to dig through the internals of Kafka Streams and > >> understand > >>> exactly how they are used. The tight coupling between state stores and > >>> internal behaviour has actually significantly hindered my progress on > >> this > >>> KIP, and encouraged me to avoid increasing this logical coupling as > much > >> as > >>> possible. > >>> > >>> This also frees implementations to satisfy those requirements in any > way > >>> they choose. Transactions might not be the only/available approach to > an > >>> implementation, but they might have an alternative way to satisfy the > >>> isolation requirements. I admit that this point is more about > semantics, > >>> but "transactional" would need to be formally defined in order for > >>> implementers to provide a valid implementation, and these > IsolationLevels > >>> provide that formal definition. > >>> > >>> 4. > >>> I can remove them. I added them only as I planned to include them in > the > >>> org.apache.kafka.streams.state package, as a recommended base > >>> implementation for all StateStores, including those implemented by > >> users. I > >>> had assumed that anything in "public" packages, such as > >>> org.apache.kafka.streams.state, should be included in a KIP. Is that > >> wrong? > >>> > >>> 5. > >>> RocksDB provides no way to measure the actual size of a > >>> WriteBatch(WithIndex), so we're limited to tracking the sum total of > the > >>> size of keys + values that are written to the transaction. This > obviously > >>> under-estimates the actual memory usage, because WriteBatch no-doubt > >>> includes some record overheads, and WriteBatchWithIndex has to maintain > >> an > >>> index. > >>> > >>> Ideally, we could trivially add a method upstream to > WriteBatchInterface > >>> that provides the exact size of the batch, but that would require an > >>> upgrade of RocksDB, which won't happen soon. So for the time being, > we're > >>> stuck with an approximation, so I felt that the new method should > reflect > >>> that. > >>> > >>> Would you prefer the new method name ignores this constraint and that > we > >>> simply make the rocks measurement more accurate in the future? > >>> > >>> 6. > >>> Done > >>> > >>> 7. > >>> Very good point. The KIP already specifically calls out memory in the > >>> documentation of the config: "Maximum number of memory bytes to be used > >> to > >>> buffer uncommitted state-store records." - did you have something else > in > >>> mind? > >>> > >>> Should we also make this clearer by renaming the config property > itself? > >>> Perhaps to something like statestore.transaction.buffer.max.bytes? > >>> > >>> 8. > >>> OK, I can remove this. The intent here was to describe how Streams > itself > >>> will manage transaction roll-over etc. Presumably that means we also > >> don't > >>> need a description of how Streams will manage the commit of changelog > >>> transactions, state store transactions and checkpointing? > >>> > >>> 9. > >>> What do you mean by fail-over? Do you mean failing over an Active Task > to > >>> an instance already hosting a Standby Task? > >>> > >>> Thanks again and sorry for the essay of a response! > >>> > >>> Regards, > >>> Nick > >>> > >>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> > wrote: > >>> > >>>> Hi Nick, > >>>> > >>>> Thanks for the updates! > >>>> > >>>> I really appreciate that you simplified the KIP by removing some > >>>> aspects. As I have already told you, I think the removed aspects are > >>>> also good ideas and we can discuss them on follow-up KIPs. > >>>> > >>>> Regarding the current KIP, I have the following feedback. > >>>> > >>>> 1. > >>>> Is there a good reason to add method newTransaction() to the > StateStore > >>>> interface? As far as I understand, the idea is that users of a state > >>>> store (transactional or not) call this method at start-up and after > each > >>>> commit. Since the call to newTransaction() is done in any case and I > >>>> think it would simplify the caller code if we just start a new > >>>> transaction after a commit in the implementation? > >>>> As far as I understand, you plan to commit the transaction in the > >>>> flush() method. I find the idea to replace flush() with commit() > >>>> presented in KIP-844 an elegant solution. > >>>> > >>>> 2. > >>>> Why is the method to query the isolation level added to the state > store > >>>> context? > >>>> > >>>> 3. > >>>> Do we need all the isolation level definitions? I think it is good to > >>>> know the guarantees of the transactionality of the state store. > >>>> However, currently, Streams guarantees that there will only be one > >>>> transaction that writes to the state store. Only the stream thread > that > >>>> executes the active task that owns the state store will write to the > >>>> state store. I think it should be enough to know if the state store is > >>>> transactional or not. So my proposal would be to just add a method on > >>>> the state store interface the returns if a state store is > transactional > >>>> or not by returning a boolean or an enum. > >>>> > >>>> 4. > >>>> I am wondering why AbstractTransaction and AbstractTransactionalStore > >>>> are part of the KIP. They look like implementation details that should > >>>> not be exposed in the public API. > >>>> > >>>> 5. > >>>> Why does StateStore#approximateNumUncommittedBytes() return an > >>>> approximate number of bytes? > >>>> > >>>> 6. > >>>> RocksDB is just one implementation of the state stores in Streams. > >>>> However, the issues regarding OOM errors might also apply to other > >>>> custom implementations. So in the KIP I would extract that part from > >>>> section "RocksDB Transaction". I would also move section "RocksDB > >>>> Transaction" to the end of section "Proposed Changes" and handle it as > >>>> an example implementation for a state store. > >>>> > >>>> 7. > >>>> Should statestore.uncommitted.max.bytes only limit the uncommitted > bytes > >>>> or the uncommitted bytes that reside in memory? In future, other > >>>> transactional state store implementations might implement a buffer for > >>>> uncommitted records that are able to spill records on disk. I think > >>>> statestore.uncommitted.max.bytes needs to limit the uncommitted bytes > >>>> irrespective if they reside in memory or disk. Since Streams will use > >>>> this config to decide if it needs to trigger a commit, state store > >>>> implementations that can spill to disk will never be able to spill to > >>>> disk. You would only need to change the doc of the config, if you > agree > >>>> with me. > >>>> > >>>> 8. > >>>> Section "Transaction Management" about the wrappers is rather a > >>>> implementation detail that should not be in the KIP. > >>>> > >>>> 9. > >>>> Could you add a section that describes how failover will work with the > >>>> transactional state stores? I think section "Error handling" is > already > >>>> a good start. > >>>> > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> > >>>> > >>>> > >>>> On 15.05.23 11:04, Nick Telford wrote: > >>>>> Hi everyone, > >>>>> > >>>>> Quick update: I've added a new section to the KIP: "Offsets for > >> Consumer > >>>>> Rebalances", that outlines my solution to the problem that > >>>>> StreamsPartitionAssignor needs to read StateStore offsets even if > >> they're > >>>>> not currently open. > >>>>> > >>>>> Regards, > >>>>> Nick > >>>>> > >>>>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Bruno, > >>>>>> > >>>>>> Thanks for reviewing my proposal. > >>>>>> > >>>>>> 1. > >>>>>> The main reason I added it was because it was easy to do. If we see > no > >>>>>> value in it, I can remove it. > >>>>>> > >>>>>> 2. > >>>>>> Global StateStores can have multiple partitions in their input > topics > >>>>>> (which function as their changelogs), so they would have more than > one > >>>>>> partition. > >>>>>> > >>>>>> 3. > >>>>>> That's a good point. At present, the only method it adds is > >>>>>> isolationLevel(), which is likely not necessary outside of > >> StateStores. > >>>>>> It *does* provide slightly different guarantees in the documentation > >> to > >>>>>> several of the methods (hence the overrides). I'm not sure if this > is > >>>>>> enough to warrant a new interface though. > >>>>>> I think the question that remains is whether this interface makes it > >>>>>> easier to implement custom transactional StateStores than if we were > >> to > >>>>>> remove it? Probably not. > >>>>>> > >>>>>> 4. > >>>>>> The main motivation for the Atomic Checkpointing is actually > >>>> performance. > >>>>>> My team has been testing out an implementation of this KIP without > it, > >>>> and > >>>>>> we had problems with RocksDB doing *much* more compaction, due to > the > >>>>>> significantly increased flush rate. It was enough of a problem that > >> (for > >>>>>> the time being), we had to revert back to Kafka Streams proper. > >>>>>> I think the best way to solve this, as you say, is to keep the > >>>> .checkpoint > >>>>>> files *in addition* to the offsets being stored within the store > >> itself. > >>>>>> Essentially, when closing StateStores, we force a memtable flush, > then > >>>>>> call getCommittedOffsets and write those out to the .checkpoint > file. > >>>>>> That would ensure the metadata is available to the > >>>>>> StreamsPartitionAssignor for all closed stores. > >>>>>> If there's a crash (no clean close), then we won't be able to > >> guarantee > >>>>>> which offsets were flushed to disk by RocksDB, so we'd need to open > ( > >>>>>> init()), read offsets, and then close() those stores. But since this > >> is > >>>>>> the exception, and will only occur once (provided it doesn't crash > >> every > >>>>>> time!), I think the performance impact here would be acceptable. > >>>>>> > >>>>>> Thanks for the feedback, please let me know if you have any more > >>>> comments > >>>>>> or questions! > >>>>>> > >>>>>> I'm currently working on rebasing against trunk. This involves > adding > >>>>>> support for transactionality to VersionedStateStores. I will > probably > >>>> need > >>>>>> to revise my implementation for transactional "segmented" stores, > both > >>>> to > >>>>>> accommodate VersionedStateStore, and to clean up some other stuff. > >>>>>> > >>>>>> Regards, > >>>>>> Nick > >>>>>> > >>>>>> > >>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> > >> wrote: > >>>>>> > >>>>>>> Hi Nick, > >>>>>>> > >>>>>>> Thanks for the updates! > >>>>>>> > >>>>>>> I have a couple of questions/comments. > >>>>>>> > >>>>>>> 1. > >>>>>>> Why do you propose a configuration that involves max. bytes and > max. > >>>>>>> reords? I think we are mainly concerned about memory consumption > >>>> because > >>>>>>> we want to limit the off-heap memory used. I cannot think of a case > >>>>>>> where one would want to set the max. number of records. > >>>>>>> > >>>>>>> > >>>>>>> 2. > >>>>>>> Why does > >>>>>>> > >>>>>>> default void commit(final Map<TopicPartition, Long> > >>>> changelogOffsets) { > >>>>>>> flush(); > >>>>>>> } > >>>>>>> > >>>>>>> take a map of partitions to changelog offsets? > >>>>>>> The mapping between state stores to partitions is a 1:1 > relationship. > >>>>>>> Passing in a single changelog offset should suffice. > >>>>>>> > >>>>>>> > >>>>>>> 3. > >>>>>>> Why do we need the Transaction interface? It should be possible to > >> hide > >>>>>>> beginning and committing a transactions withing the state store > >>>>>>> implementation, so that from outside the state store, it does not > >>>> matter > >>>>>>> whether the state store is transactional or not. What would be the > >>>>>>> advantage of using the Transaction interface? > >>>>>>> > >>>>>>> > >>>>>>> 4. > >>>>>>> Regarding checkpointing offsets, I think we should keep the > >> checkpoint > >>>>>>> file in any case for the reason you mentioned about rebalancing. > Even > >>>> if > >>>>>>> that would not be an issue, I would propose to move the change to > >>>> offset > >>>>>>> management to a new KIP and to not add more complexity than needed > to > >>>>>>> this one. I would not be too concerned about the consistency > >> violation > >>>>>>> you mention. As far as I understand, with transactional state > stores > >>>>>>> Streams would write the checkpoint file during every commit even > >> under > >>>>>>> EOS. In the failure case you describe, Streams would restore the > >> state > >>>>>>> stores from the offsets found in the checkpoint file written during > >> the > >>>>>>> penultimate commit instead of during the last commit. Basically, > >>>> Streams > >>>>>>> would overwrite the records written to the state store between the > >> last > >>>>>>> two commits with the same records read from the changelogs. While I > >>>>>>> understand that this is wasteful, it is -- at the same time -- > >>>>>>> acceptable and most importantly it does not break EOS. > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> > >>>>>>> On 27.04.23 12:34, Nick Telford wrote: > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> I find myself (again) considering removing the offset management > >> from > >>>>>>>> StateStores, and keeping the old checkpoint file system. The > reason > >> is > >>>>>>> that > >>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in > order > >>>> to > >>>>>>>> determine which instance has the most up-to-date copy of the local > >>>>>>> state. > >>>>>>>> If we move offsets into the StateStore itself, then we will need > to > >>>>>>> open, > >>>>>>>> initialize, read offsets and then close each StateStore (that is > not > >>>>>>>> already assigned and open) for which we have *any* local state, on > >>>> every > >>>>>>>> rebalance. > >>>>>>>> > >>>>>>>> Generally, I don't think there are many "orphan" stores like this > >>>>>>> sitting > >>>>>>>> around on most instances, but even a few would introduce > additional > >>>>>>> latency > >>>>>>>> to an already somewhat lengthy rebalance procedure. > >>>>>>>> > >>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping > things > >>>> in > >>>>>>> the > >>>>>>>> checkpoint file(s) for now, and not worrying about the race. The > >>>>>>> downside > >>>>>>>> is that we wouldn't be able to remove the explicit RocksDB flush > >>>>>>> on-commit, > >>>>>>>> which likely hurts performance. > >>>>>>>> > >>>>>>>> If anyone has any thoughts or ideas on this subject, I would > >>>> appreciate > >>>>>>> it! > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Nick > >>>>>>>> > >>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford < > nick.telf...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Colt, > >>>>>>>>> > >>>>>>>>> The issue is that if there's a crash between 2 and 3, then you > >> still > >>>>>>> end > >>>>>>>>> up with inconsistent data in RocksDB. The only way to guarantee > >> that > >>>>>>> your > >>>>>>>>> checkpoint offsets and locally stored data are consistent with > each > >>>>>>> other > >>>>>>>>> are to atomically commit them, which can be achieved by having > the > >>>>>>> offsets > >>>>>>>>> stored in RocksDB. > >>>>>>>>> > >>>>>>>>> The offsets column family is likely to be extremely small (one > >>>>>>>>> per-changelog partition + one per Topology input partition for > >>>> regular > >>>>>>>>> stores, one per input partition for global stores). So the > overhead > >>>>>>> will be > >>>>>>>>> minimal. > >>>>>>>>> > >>>>>>>>> A major benefit of doing this is that we can remove the explicit > >>>> calls > >>>>>>> to > >>>>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit. > It > >>>>>>> turns > >>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka > Streams > >>>>>>>>> commits, *not* RocksDB configuration, which could be a major > source > >>>> of > >>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these > >>>> explicit > >>>>>>>>> flushes, because it no longer matters exactly when RocksDB > flushes > >>>>>>> data to > >>>>>>>>> disk; since the data and corresponding checkpoint offsets will > >> always > >>>>>>> be > >>>>>>>>> flushed together, the local store is always in a consistent > state, > >>>> and > >>>>>>>>> on-restart, it can always safely resume restoration from the > >> on-disk > >>>>>>>>> offsets, restoring the small amount of data that hadn't been > >> flushed > >>>>>>> when > >>>>>>>>> the app exited/crashed. > >>>>>>>>> > >>>>>>>>> Regards, > >>>>>>>>> Nick > >>>>>>>>> > >>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Nick, > >>>>>>>>>> > >>>>>>>>>> Thanks for your reply. Ack to A) and B). > >>>>>>>>>> > >>>>>>>>>> For item C), I see what you're referring to. Your proposed > >> solution > >>>>>>> will > >>>>>>>>>> work, so no need to change it. What I was suggesting was that it > >>>>>>> might be > >>>>>>>>>> possible to achieve this with only one column family. So long > as: > >>>>>>>>>> > >>>>>>>>>> - No uncommitted records (i.e. not committed to the > >> changelog) > >>>> are > >>>>>>>>>> *committed* to the state store, AND > >>>>>>>>>> - The Checkpoint offset (which refers to the changelog > >> topic) > >>>> is > >>>>>>> less > >>>>>>>>>> than or equal to the last written changelog offset in > >> rocksdb > >>>>>>>>>> > >>>>>>>>>> I don't see the need to do the full restoration from scratch. My > >>>>>>>>>> understanding was that prior to 844/892, full restorations were > >>>>>>> required > >>>>>>>>>> because there could be uncommitted records written to RocksDB; > >>>>>>> however, > >>>>>>>>>> given your use of RocksDB transactions, that can be avoided with > >> the > >>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB > >>>>>>> transaction, 3) > >>>>>>>>>> update offset in checkpoint file. > >>>>>>>>>> > >>>>>>>>>> Anyways, your proposed solution works equivalently and I don't > >>>> believe > >>>>>>>>>> there is much overhead to an additional column family in > RocksDB. > >>>>>>> Perhaps > >>>>>>>>>> it may even perform better than making separate writes to the > >>>>>>> checkpoint > >>>>>>>>>> file. > >>>>>>>>>> > >>>>>>>>>> Colt McNealy > >>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford < > >>>> nick.telf...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Colt, > >>>>>>>>>>> > >>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from the > >>>> rest > >>>>>>> of > >>>>>>>>>> the > >>>>>>>>>>> Streams engine. The fact that there will be only one ongoing > >>>> (write) > >>>>>>>>>>> transaction at a time is not guaranteed by any API, and is > just a > >>>>>>>>>>> consequence of the way Streams operates. To that end, I tried > to > >>>>>>> ensure > >>>>>>>>>> the > >>>>>>>>>>> documentation and guarantees provided by the new APIs are > >>>>>>> independent of > >>>>>>>>>>> this incidental behaviour. In practice, you're right, this > >>>>>>> essentially > >>>>>>>>>>> refers to "interactive queries", which are technically "read > >>>>>>>>>> transactions", > >>>>>>>>>>> even if they don't actually use the transaction API to isolate > >>>>>>>>>> themselves. > >>>>>>>>>>> > >>>>>>>>>>> B. Yes, although not ideal. This is for backwards > compatibility, > >>>>>>>>>> because: > >>>>>>>>>>> 1) Existing custom StateStore implementations will > >> implement > >>>>>>>>>> flush(), > >>>>>>>>>>> and not commit(), but the Streams engine now calls commit(), so > >>>> those > >>>>>>>>>> calls > >>>>>>>>>>> need to be forwarded to flush() for these legacy stores. > >>>>>>>>>>> 2) Existing StateStore *users*, i.e. outside of the > >> Streams > >>>>>>> engine > >>>>>>>>>>> itself, may depend on explicitly calling flush(), so for these > >>>> cases, > >>>>>>>>>>> flush() needs to be redirected to call commit(). > >>>>>>>>>>> If anyone has a better way to guarantee compatibility without > >>>>>>>>>> introducing > >>>>>>>>>>> this potential recursion loop, I'm open to changes! > >>>>>>>>>>> > >>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section. > >> Offsets > >>>>>>> are > >>>>>>>>>>> stored in a separate RocksDB column family, which is guaranteed > >> to > >>>> be > >>>>>>>>>>> atomically flushed to disk with all other column families. The > >>>> issue > >>>>>>> of > >>>>>>>>>>> checkpoints being written to disk after commit causing > >>>> inconsistency > >>>>>>> if > >>>>>>>>>> it > >>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint > files > >>>> are > >>>>>>>>>> only > >>>>>>>>>>> written on clean shutdown. This is one of the major causes of > >> "full > >>>>>>>>>>> restorations", so moving the offsets into a place where they > can > >> be > >>>>>>>>>>> guaranteed to be atomically written with the data they > checkpoint > >>>>>>>>>> allows us > >>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just on > >>>> clean > >>>>>>>>>>> shutdown. > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Nick > >>>>>>>>>>> > >>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy < > c...@littlehorse.io> > >>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Nick, > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for continuing this work. I have a few minor > >> clarifying > >>>>>>>>>>>> questions. > >>>>>>>>>>>> > >>>>>>>>>>>> A) "Records written to any transaction are visible to all > other > >>>>>>>>>>>> transactions immediately." I am confused here—I thought there > >>>> could > >>>>>>>>>> only > >>>>>>>>>>> be > >>>>>>>>>>>> one transaction going on at a time for a given state store > given > >>>> the > >>>>>>>>>>>> threading model for processing records on a Task. Do you mean > >>>>>>>>>> Interactive > >>>>>>>>>>>> Queries by "other transactions"? (If so, then everything makes > >>>>>>> sense—I > >>>>>>>>>>>> thought that since IQ were read-only then they didn't count as > >>>>>>>>>>>> transactions). > >>>>>>>>>>>> > >>>>>>>>>>>> B) Is it intentional that the default implementations of the > >>>> flush() > >>>>>>>>>> and > >>>>>>>>>>>> commit() methods in the StateStore class refer to each other > in > >>>> some > >>>>>>>>>> sort > >>>>>>>>>>>> of unbounded recursion? > >>>>>>>>>>>> > >>>>>>>>>>>> C) How will the getCommittedOffset() method work? At first I > >>>> thought > >>>>>>>>>> the > >>>>>>>>>>>> way to do it would be using a special key in the RocksDB store > >> to > >>>>>>>>>> store > >>>>>>>>>>> the > >>>>>>>>>>>> offset, and committing that with the transaction. But upon > >> second > >>>>>>>>>>> thought, > >>>>>>>>>>>> since restoration from the changelog is an idempotent > >> procedure, I > >>>>>>>>>> think > >>>>>>>>>>> it > >>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then 2) > >>>> write > >>>>>>>>>> the > >>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash > between > >>>> 1) > >>>>>>>>>> and > >>>>>>>>>>> 2), > >>>>>>>>>>>> I think the only downside is now we replay a few more records > >> (at > >>>> a > >>>>>>>>>> cost > >>>>>>>>>>> of > >>>>>>>>>>>> <100ms). Am I missing something there? > >>>>>>>>>>>> > >>>>>>>>>>>> Other than that, everything makes sense to me. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < > >>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the > >> design: > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>> > >>>>>>>>>>>>> There are several changes in there that reflect feedback from > >>>> this > >>>>>>>>>>>> thread, > >>>>>>>>>>>>> and there's a new section and a bunch of interface changes > >>>> relating > >>>>>>>>>> to > >>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the puzzle > to > >>>>>>>>>> making > >>>>>>>>>>>>> everything robust. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Let me know what you think! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford < > >>>> nick.telf...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Lucas, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for looking over my KIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a typo > in > >>>> the > >>>>>>>>>>> KIP > >>>>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I > >>>>>>>>>> changed it > >>>>>>>>>>>> to > >>>>>>>>>>>>>> per-instance for exactly the reason you highlighted. > >>>>>>>>>>>>>> B) It's worth noting that transactionality is only enabled > >> under > >>>>>>>>>> EOS, > >>>>>>>>>>>> and > >>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be no > >>>>>>>>>> change in > >>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the > >> impact > >>>> on > >>>>>>>>>>>> users > >>>>>>>>>>>>> by > >>>>>>>>>>>>>> sufficiently low default values for the memory bound > >>>>>>>>>> configuration. I > >>>>>>>>>>>>>> understand your hesitation to include a significant change > of > >>>>>>>>>>>> behaviour, > >>>>>>>>>>>>>> especially in a minor release, but I suspect that most users > >>>> will > >>>>>>>>>>>> prefer > >>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of > >>>>>>>>>> frequent > >>>>>>>>>>>> state > >>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait > until > >>>> the > >>>>>>>>>>> next > >>>>>>>>>>>>>> major release. I'll be running a patched version of streams > in > >>>>>>>>>>>> production > >>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't > >> disrupt > >>>>>>>>>> me > >>>>>>>>>>> :-D > >>>>>>>>>>>>>> C) The main purpose of this sentence was just to note that > >> some > >>>>>>>>>>> changes > >>>>>>>>>>>>>> will need to be made to the way Segments are handled in > order > >> to > >>>>>>>>>>> ensure > >>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote > it, I > >>>>>>>>>> hadn't > >>>>>>>>>>>>>> figured out the specific changes necessary, so it was > >>>> deliberately > >>>>>>>>>>>> vague. > >>>>>>>>>>>>>> This is the one outstanding problem I'm currently working > on, > >>>> and > >>>>>>>>>>> I'll > >>>>>>>>>>>>>> update this section with more detail once I have figured out > >> the > >>>>>>>>>>> exact > >>>>>>>>>>>>>> changes required. > >>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation > >> guarantees. > >>>>>>>>>>> While > >>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't > technically > >>>>>>>>>> *need* > >>>>>>>>>>>>>> read-only users to call newTransaction(), other > >> implementations > >>>>>>>>>>> (e.g. a > >>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling > >>>>>>>>>> newTransaction() > >>>>>>>>>>>> when > >>>>>>>>>>>>>> no transaction is necessary is essentially free, as it will > >> just > >>>>>>>>>>> return > >>>>>>>>>>>>>> this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think it > >>>>>>>>>> should > >>>>>>>>>>> be > >>>>>>>>>>>>>> fairly obvious where the performance problems stem from: > >> writes > >>>>>>>>>> under > >>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with > the > >>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the > >> tombstone/record > >>>>>>>>>> flag, > >>>>>>>>>>>>> and 1 > >>>>>>>>>>>>>> to copy the record from the "temporary" store to the "main" > >>>> store, > >>>>>>>>>>> when > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> transaction commits. The different approach taken by KIP-869 > >>>>>>>>>> should > >>>>>>>>>>>>> perform > >>>>>>>>>>>>>> much better, as it avoids all these copies, and may actually > >>>>>>>>>> perform > >>>>>>>>>>>>>> slightly better than trunk, due to batched writes in RocksDB > >>>>>>>>>>> performing > >>>>>>>>>>>>>> better than non-batched writes.[1] > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1: > >>>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>> > >>>> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < > >>>>>>>>>> lbruts...@confluent.io > >>>>>>>>>>>>> .invalid> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion about > >>>>>>>>>> KIP-892 > >>>>>>>>>>> and > >>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think > >>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do have > >> some > >>>>>>>>>>>>>>> questions about the latest draft. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> A) If I understand correctly, you propose to put a > bound > >> on > >>>> the > >>>>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder if > >>>> this > >>>>>>>>>> is > >>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the cluster. > >> For > >>>>>>>>>>>>>>> example, depending on the timing of rebalances during a > >> cluster > >>>>>>>>>>>>>>> restart, it could happen that a single streams node is > >>>> assigned a > >>>>>>>>>>> lot > >>>>>>>>>>>>>>> more tasks than expected. With your proposed change, this > >> would > >>>>>>>>>> mean > >>>>>>>>>>>>>>> that the memory required by this one node could be a > multiple > >>>> of > >>>>>>>>>>> what > >>>>>>>>>>>>>>> is required during normal operation. I wonder if it > wouldn't > >> be > >>>>>>>>>>> safer > >>>>>>>>>>>>>>> to put a global bound on the memory use, across all tasks. > >>>>>>>>>>>>>>> B) Generally, the memory concerns still give me the > >> feeling > >>>>>>> that > >>>>>>>>>>> this > >>>>>>>>>>>>>>> should not be enabled by default for all users in a minor > >>>>>>>>>> release. > >>>>>>>>>>>>>>> C) In section "Transaction Management": the sentence > "A > >>>> similar > >>>>>>>>>>>>>>> analogue will be created to automatically manage `Segment` > >>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some > >> background, > >>>>>>>>>> but I > >>>>>>>>>>>>>>> do not understand this, it would be great if you could > >> clarify > >>>>>>>>>> what > >>>>>>>>>>>>>>> you mean here. > >>>>>>>>>>>>>>> D) Could you please clarify why IQ has to call > >>>>>>> newTransaction(), > >>>>>>>>>>> when > >>>>>>>>>>>>>>> it's read-only. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if > there > >>>> is > >>>>>>>>>> an > >>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x > >> slower > >>>>>>>>>> (e.g. > >>>>>>>>>>>>>>> by providing a flame graph), that would be quite > interesting. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < > >>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which > >>>>>>>>>> reflects > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> implementation I've been working on: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This new design should address the outstanding points > >> already > >>>>>>>>>> made > >>>>>>>>>>>> in > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> thread. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear or > >> need > >>>>>>>>>>> more > >>>>>>>>>>>>>>>> clarification. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident > that > >>>>>>>>>> the > >>>>>>>>>>>>>>> remaining > >>>>>>>>>>>>>>>> work (making Segments behave) will not impact the > documented > >>>>>>>>>>> design. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy < > >>>> c...@littlehorse.io > >>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping > >> that, > >>>>>>>>>>>> since > >>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the > >>>>>>>>>> documented > >>>>>>>>>>>> API, > >>>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release to > >>>>>>>>>> make > >>>>>>>>>>>> that > >>>>>>>>>>>>>>> change; > >>>>>>>>>>>>>>>>> but given that it would be considered a major change, I > >> like > >>>>>>>>>>> your > >>>>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>>> the best. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < > >>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended > to > >>>>>>>>>> try > >>>>>>>>>>> to > >>>>>>>>>>>>>>> keep the > >>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise we'd > >>>>>>>>>> have > >>>>>>>>>>> to > >>>>>>>>>>>>>>> wait for > >>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> major version release to land these changes. > >>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of > >>>>>>>>>> guarantee, > >>>>>>>>>>>> and > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> typically longer commit intervals would be problematic > >> when > >>>>>>>>>>>>> reading > >>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>> "committed" records. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor surgery, > >>>>>>>>>> but I > >>>>>>>>>>>>>>> spent a > >>>>>>>>>>>>>>>>>> considerable amount of that time working through ideas > for > >>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of keeping > >>>>>>>>>> ALOS > >>>>>>>>>>>>>>> as-is, but > >>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards, > >>>>>>>>>> although > >>>>>>>>>>> I > >>>>>>>>>>>>>>> have a > >>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for > some > >>>>>>>>>> more > >>>>>>>>>>>>>>> formal > >>>>>>>>>>>>>>>>>> guarantees. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for > >>>>>>>>>>>> "Transactions", > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read Committed" > >>>>>>>>>> when > >>>>>>>>>>>> under > >>>>>>>>>>>>>>> EOS, > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us > >> much > >>>>>>>>>>> more > >>>>>>>>>>>>>>> clearly > >>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly > >>>>>>>>>> documented to > >>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>> know what to expect. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and will > >>>>>>>>>> update > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to > implement > >>>>>>>>>>> this > >>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces or > >>>>>>>>>>> breaking > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> implementations, because currently everything expects to > >>>>>>>>>>> operate > >>>>>>>>>>>>>>> directly > >>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. I > >>>>>>>>>> think > >>>>>>>>>>>> I'm > >>>>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much > >>>>>>>>>> until > >>>>>>>>>>>> next > >>>>>>>>>>>>>>> week > >>>>>>>>>>>>>>>>> due > >>>>>>>>>>>>>>>>>> to some work commitments. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < > >>>>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the updated > >>>>>>>>>>> KIP. I > >>>>>>>>>>>>> am > >>>>>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>>>> eager for this improvement to be released as it would > >>>>>>>>>>> greatly > >>>>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Two questions: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 10) > >>>>>>>>>>>>>>>>>>>> When reading records, we will use the > >>>>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB > >>>>>>>>>>>>>>>>>>> and WriteBatchWithIndex#newIteratorWithBase > >> utilities in > >>>>>>>>>>>> order > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>> that uncommitted writes are available to query. > >>>>>>>>>>>>>>>>>>> Why do extra work to enable the reading of uncommitted > >>>>>>>>>>> writes > >>>>>>>>>>>>>>> during > >>>>>>>>>>>>>>>>> IQ? > >>>>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, > in > >>>>>>>>>> my > >>>>>>>>>>>>>>> opinion, a > >>>>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have the > >>>>>>>>>>>>> guarantee > >>>>>>>>>>>>>>> that, > >>>>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order to > >>>>>>>>>> avoid > >>>>>>>>>>>>> dirty > >>>>>>>>>>>>>>>>> reads, > >>>>>>>>>>>>>>>>>>> one currently must query a standby replica (but this > >>>>>>>>>> still > >>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>>> guarantee monotonic reads). > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization on > >>>>>>>>>> ALOS > >>>>>>>>>>>>>>> stores? > >>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need to > >>>>>>>>>>> restore > >>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was > >>>>>>>>>>> acceptable > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk. > On a > >>>>>>>>>>> side > >>>>>>>>>>>>>>> note, if > >>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the > >>>>>>>>>> community > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads; > >>>>>>>>>> otherwise > >>>>>>>>>>>>> users > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thank you for doing this fantastic work! > >>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < > >>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no > >>>>>>>>>>> longer > >>>>>>>>>>>>>>> include > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be > >>>>>>>>>> added > >>>>>>>>>>>> as a > >>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>> to further optimize the consistency and performance of > >>>>>>>>>>> state > >>>>>>>>>>>>>>> stores. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've also added a section discussing some of the > >>>>>>>>>> concerns > >>>>>>>>>>>>> around > >>>>>>>>>>>>>>>>>>>> concurrency, especially in the presence of Iterators. > >>>>>>>>>> I'm > >>>>>>>>>>>>>>> thinking of > >>>>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a reference-counting > >>>>>>>>>>>>>>> copy-on-write > >>>>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an > >>>>>>>>>>> active > >>>>>>>>>>>>>>>>> iterator), > >>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> I'm open to suggestions. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford < > >>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844 > >>>>>>>>>> implementation: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> - Writes uncommitted records to a temporary > >>>>>>>>>> RocksDB > >>>>>>>>>>>>>>> instance > >>>>>>>>>>>>>>>>>>>>> - Since tombstones need to be flagged, all > >>>>>>>>>> record > >>>>>>>>>>>>>>> values are > >>>>>>>>>>>>>>>>>>>>> prefixed with a value/tombstone marker. > This > >>>>>>>>>>>>>>> necessitates a > >>>>>>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>>>>>> - On-commit, iterates all records in this > >>>>>>>>>> temporary > >>>>>>>>>>>>>>> instance and > >>>>>>>>>>>>>>>>>>>>> writes them to the main RocksDB store. > >>>>>>>>>>>>>>>>>>>>> - While iterating, the value/tombstone marker > >>>>>>>>>> needs > >>>>>>>>>>> to > >>>>>>>>>>>> be > >>>>>>>>>>>>>>> parsed > >>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> the real value extracted. This necessitates > >>>>>>>>>> another > >>>>>>>>>>>>> memory > >>>>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> My guess is that the cost of iterating the temporary > >>>>>>>>>>>> RocksDB > >>>>>>>>>>>>>>> store > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> major factor, with the 2 extra memory copies > >>>>>>>>>> per-Record > >>>>>>>>>>>>>>>>> contributing > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> significant amount too. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:12, Colt McNealy < > >>>>>>>>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Out of curiosity, why does the performance of the > >>>>>>>>>> store > >>>>>>>>>>>>>>> degrade so > >>>>>>>>>>>>>>>>>>>>>> significantly with the 844 implementation? I > >>>>>>>>>> wouldn't > >>>>>>>>>>> be > >>>>>>>>>>>>> too > >>>>>>>>>>>>>>>>>> surprised > >>>>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> 50-60% drop (caused by each record being written > >>>>>>>>>>> twice), > >>>>>>>>>>>>> but > >>>>>>>>>>>>>>> 96% > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> extreme. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> The only thing I can think of which could create > >>>>>>>>>> such a > >>>>>>>>>>>>>>> bottleneck > >>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> that perhaps the 844 implementation deserializes and > >>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>> re-serializes > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> store values when copying from the uncommitted to > >>>>>>>>>>>> committed > >>>>>>>>>>>>>>> store, > >>>>>>>>>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>>>>>>> wasn't able to figure that out when I scanned the > >>>>>>>>>> PR. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford < > >>>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I've updated the KIP to resolve all the points > >>>>>>>>>> that > >>>>>>>>>>>> have > >>>>>>>>>>>>>>> been > >>>>>>>>>>>>>>>>>> raised > >>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>> far, with one exception: the ALOS default commit > >>>>>>>>>>>> interval > >>>>>>>>>>>>>>> of 5 > >>>>>>>>>>>>>>>>>>> minutes > >>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> likely to cause WriteBatchWithIndex memory to grow > >>>>>>>>>>> too > >>>>>>>>>>>>>>> large. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> There's a couple of different things I can think > >>>>>>>>>> of > >>>>>>>>>>> to > >>>>>>>>>>>>>>> solve > >>>>>>>>>>>>>>>>> this: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> - We already have a memory/record limit in > the > >>>>>>>>>> KIP > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> prevent > >>>>>>>>>>>>>>>>>> OOM > >>>>>>>>>>>>>>>>>>>>>>> errors. Should we choose a default value for > >>>>>>>>>>> these? > >>>>>>>>>>>> My > >>>>>>>>>>>>>>>>> concern > >>>>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>> anything we choose might seem rather > >>>>>>>>>> arbitrary. We > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>>>> its behaviour such that under ALOS, it only > >>>>>>>>>>> triggers > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> commit > >>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> StateStore, but under EOS, it triggers a > >>>>>>>>>> commit of > >>>>>>>>>>>> the > >>>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>>>>>>> transaction. > >>>>>>>>>>>>>>>>>>>>>>> - We could introduce a separate ` > >>>>>>>>>>>>> checkpoint.interval.ms` > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> allow > >>>>>>>>>>>>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>>>>>>>> to commit the StateStores more frequently > than > >>>>>>>>>> the > >>>>>>>>>>>>>>> general > >>>>>>>>>>>>>>>>>>>>>>> commit.interval.ms? My concern here is that > >>>>>>>>>> the > >>>>>>>>>>>>>>> semantics of > >>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>> config > >>