Potentially we could just go the memorable with Rocks WriteBatches route if the cache is disabled?
On Tue, 20 Jun 2023, 22:00 John Roesler, <j...@vvcephei.org> wrote: > Touché! > > Ok, I agree that figuring out the case of a disabled cache would be > non-trivial. Ingesting single-record SST files will probably not be > performant, but benchmarking may prove different. Or maybe we can have > some reserved cache space on top of the user-configured cache, which we > would have reclaimed from the memtable space. Or some other, more > creative solution. > > Thanks, > -John > > On 6/20/23 15:30, Nick Telford wrote: > >> Note that users can disable the cache, which would still be > > ok, I think. We wouldn't ingest the SST files on every record, but just > > append to them and only ingest them on commit, when we're already > > waiting for acks and a RocksDB commit. > > > > In this case, how would uncommitted records be read by joins? > > > > On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote: > > > >> Ah, sorry Nick, > >> > >> I just meant the regular heap based cache that we maintain in Streams. I > >> see that it's not called "RecordCache" (my mistake). > >> > >> The actual cache is ThreadCache: > >> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java > >> > >> Here's the example of how we use the cache in KeyValueStore: > >> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java > >> > >> It's basically just an on-heap Map of records that have not yet been > >> written to the changelog or flushed into the underlying store. It gets > >> flushed when the total cache size exceeds `cache.max.bytes.buffering` or > >> the `commit.interval.ms` elapses. > >> > >> Speaking of those configs, another benefit to this idea is that we would > >> no longer need to trigger extra commits based on the size of the ongoing > >> transaction. Instead, we'd just preserve the existing cache-flush > >> behavior. Note that users can disable the cache, which would still be > >> ok, I think. We wouldn't ingest the SST files on every record, but just > >> append to them and only ingest them on commit, when we're already > >> waiting for acks and a RocksDB commit. > >> > >> Thanks, > >> -John > >> > >> On 6/20/23 14:09, Nick Telford wrote: > >>> Hi John, > >>> > >>> By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find > any > >>> class called "RecordCache"... > >>> > >>> Cheers, > >>> > >>> Nick > >>> > >>> On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org> > wrote: > >>> > >>>> Hi Nick, > >>>> > >>>> Thanks for picking this up again! > >>>> > >>>> I did have one new thought over the intervening months, which I'd like > >>>> your take on. > >>>> > >>>> What if, instead of using the RocksDB atomic write primitive at all, > we > >>>> instead just: > >>>> 1. disable memtables entirely > >>>> 2. directly write the RecordCache into SST files when we flush > >>>> 3. atomically ingest the SST file(s) into RocksDB when we get the ACK > >>>> from the changelog (see > >>>> > >>>> > >> > https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md > >>>> and > >>>> > >>>> > >> > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java > >>>> and > >>>> > >>>> > >> > https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429 > >>>> ) > >>>> 4. track the changelog offsets either in another CF or the same CF > with > >>>> a reserved key, either of which will make the changelog offset update > >>>> atomic with the file ingestions > >>>> > >>>> I suspect this'll have a number of benefits: > >>>> * writes to RocksDB will always be atomic > >>>> * we don't fragment memory between the RecordCache and the memtables > >>>> * RecordCache gives far higher performance than memtable for reads and > >>>> writes > >>>> * we don't need any new "transaction" concepts or memory bound configs > >>>> > >>>> What do you think? > >>>> > >>>> Thanks, > >>>> -John > >>>> > >>>> On 6/20/23 10:51, Nick Telford wrote: > >>>>> Hi Bruno, > >>>>> > >>>>> Thanks for reviewing the KIP. It's been a long road, I started > working > >> on > >>>>> this more than a year ago, and most of the time in the last 6 months > >> has > >>>>> been spent on the "Atomic Checkpointing" stuff that's been benched, > so > >>>> some > >>>>> of the reasoning behind some of my decisions have been lost, but I'll > >> do > >>>> my > >>>>> best to reconstruct them. > >>>>> > >>>>> 1. > >>>>> IIRC, this was the initial approach I tried. I don't remember the > exact > >>>>> reasons I changed it to use a separate "view" of the StateStore that > >>>>> encapsulates the transaction, but I believe it had something to do > with > >>>>> concurrent access to the StateStore from Interactive Query threads. > >> Reads > >>>>> from interactive queries need to be isolated from the currently > ongoing > >>>>> transaction, both for consistency (so interactive queries don't > observe > >>>>> changes that are subsequently rolled-back), but also to prevent > >> Iterators > >>>>> opened by an interactive query from being closed and invalidated by > the > >>>>> StreamThread when it commits the transaction, which causes your > >>>> interactive > >>>>> queries to crash. > >>>>> > >>>>> Another reason I believe I implemented it this way was a separation > of > >>>>> concerns. Recall that newTransaction() originally created an object > of > >>>> type > >>>>> Transaction, not StateStore. My intent was to improve the type-safety > >> of > >>>>> the API, in an effort to ensure Transactions weren't used > incorrectly. > >>>>> Unfortunately, this didn't pan out, but newTransaction() remained. > >>>>> > >>>>> Finally, this had the added benefit that implementations could easily > >> add > >>>>> support for transactions *without* re-writing their existing, > >>>>> non-transactional implementation. I think this can be a benefit both > >> for > >>>>> implementers of custom StateStores, but also for anyone extending > >>>>> RocksDbStore, as they can rely on the existing access methods working > >> how > >>>>> they expect them to. > >>>>> > >>>>> I'm not too happy with the way the current design has panned out, so > >> I'm > >>>>> open to ideas on how to improve it. Key to this is finding some way > to > >>>>> ensure that reads from Interactive Query threads are properly > isolated > >>>> from > >>>>> the transaction, *without* the performance overhead of checking which > >>>>> thread the method is being called from on every access. > >>>>> > >>>>> As for replacing flush() with commit() - I saw no reason to add this > >>>>> complexity to the KIP, unless there was a need to add arguments to > the > >>>>> flush/commit method. This need arises with Atomic Checkpointing, but > >> that > >>>>> will be implemented separately, in a future KIP. Do you see a need > for > >>>> some > >>>>> arguments to the flush/commit method that I've missed? Or were you > >> simply > >>>>> suggesting a rename? > >>>>> > >>>>> 2. > >>>>> This is simply due to the practical reason that isolationLevel() is > >>>> really > >>>>> a proxy for checking if the app is under EOS. The application > >>>> configuration > >>>>> is not provided to the constructor of StateStores, but it *is* > provided > >>>> to > >>>>> init(), via StateStoreContext. For this reason, it seemed somewhat > >>>> natural > >>>>> to add it to StateStoreContext. I think this makes sense, since the > >>>>> IsolationLevel of all StateStores in an application *must* be the > same, > >>>> and > >>>>> since those stores are all initialized with the same > StateStoreContext, > >>>> it > >>>>> seems natural for that context to carry the desired IsolationLevel to > >>>> use. > >>>>> > >>>>> 3. > >>>>> Using IsolationLevel instead of just passing `boolean eosEnabled`, > like > >>>>> much of the internals was an attempt to logically de-couple the > >>>> StateStore > >>>>> API from the internals of Kafka Streams. Technically, StateStores > don't > >>>>> need to know/care what processing mode the KS app is using, all they > >> need > >>>>> to know is the isolation level expected of them. > >>>>> > >>>>> Having formal definitions for the expectations of the two required > >>>>> IsolationLevels allow implementers to implement transactional stores > >>>>> without having to dig through the internals of Kafka Streams and > >>>> understand > >>>>> exactly how they are used. The tight coupling between state stores > and > >>>>> internal behaviour has actually significantly hindered my progress on > >>>> this > >>>>> KIP, and encouraged me to avoid increasing this logical coupling as > >> much > >>>> as > >>>>> possible. > >>>>> > >>>>> This also frees implementations to satisfy those requirements in any > >> way > >>>>> they choose. Transactions might not be the only/available approach to > >> an > >>>>> implementation, but they might have an alternative way to satisfy the > >>>>> isolation requirements. I admit that this point is more about > >> semantics, > >>>>> but "transactional" would need to be formally defined in order for > >>>>> implementers to provide a valid implementation, and these > >> IsolationLevels > >>>>> provide that formal definition. > >>>>> > >>>>> 4. > >>>>> I can remove them. I added them only as I planned to include them in > >> the > >>>>> org.apache.kafka.streams.state package, as a recommended base > >>>>> implementation for all StateStores, including those implemented by > >>>> users. I > >>>>> had assumed that anything in "public" packages, such as > >>>>> org.apache.kafka.streams.state, should be included in a KIP. Is that > >>>> wrong? > >>>>> > >>>>> 5. > >>>>> RocksDB provides no way to measure the actual size of a > >>>>> WriteBatch(WithIndex), so we're limited to tracking the sum total of > >> the > >>>>> size of keys + values that are written to the transaction. This > >> obviously > >>>>> under-estimates the actual memory usage, because WriteBatch no-doubt > >>>>> includes some record overheads, and WriteBatchWithIndex has to > maintain > >>>> an > >>>>> index. > >>>>> > >>>>> Ideally, we could trivially add a method upstream to > >> WriteBatchInterface > >>>>> that provides the exact size of the batch, but that would require an > >>>>> upgrade of RocksDB, which won't happen soon. So for the time being, > >> we're > >>>>> stuck with an approximation, so I felt that the new method should > >> reflect > >>>>> that. > >>>>> > >>>>> Would you prefer the new method name ignores this constraint and that > >> we > >>>>> simply make the rocks measurement more accurate in the future? > >>>>> > >>>>> 6. > >>>>> Done > >>>>> > >>>>> 7. > >>>>> Very good point. The KIP already specifically calls out memory in the > >>>>> documentation of the config: "Maximum number of memory bytes to be > used > >>>> to > >>>>> buffer uncommitted state-store records." - did you have something > else > >> in > >>>>> mind? > >>>>> > >>>>> Should we also make this clearer by renaming the config property > >> itself? > >>>>> Perhaps to something like statestore.transaction.buffer.max.bytes? > >>>>> > >>>>> 8. > >>>>> OK, I can remove this. The intent here was to describe how Streams > >> itself > >>>>> will manage transaction roll-over etc. Presumably that means we also > >>>> don't > >>>>> need a description of how Streams will manage the commit of changelog > >>>>> transactions, state store transactions and checkpointing? > >>>>> > >>>>> 9. > >>>>> What do you mean by fail-over? Do you mean failing over an Active > Task > >> to > >>>>> an instance already hosting a Standby Task? > >>>>> > >>>>> Thanks again and sorry for the essay of a response! > >>>>> > >>>>> Regards, > >>>>> Nick > >>>>> > >>>>> On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org> > >> wrote: > >>>>> > >>>>>> Hi Nick, > >>>>>> > >>>>>> Thanks for the updates! > >>>>>> > >>>>>> I really appreciate that you simplified the KIP by removing some > >>>>>> aspects. As I have already told you, I think the removed aspects are > >>>>>> also good ideas and we can discuss them on follow-up KIPs. > >>>>>> > >>>>>> Regarding the current KIP, I have the following feedback. > >>>>>> > >>>>>> 1. > >>>>>> Is there a good reason to add method newTransaction() to the > >> StateStore > >>>>>> interface? As far as I understand, the idea is that users of a state > >>>>>> store (transactional or not) call this method at start-up and after > >> each > >>>>>> commit. Since the call to newTransaction() is done in any case and I > >>>>>> think it would simplify the caller code if we just start a new > >>>>>> transaction after a commit in the implementation? > >>>>>> As far as I understand, you plan to commit the transaction in the > >>>>>> flush() method. I find the idea to replace flush() with commit() > >>>>>> presented in KIP-844 an elegant solution. > >>>>>> > >>>>>> 2. > >>>>>> Why is the method to query the isolation level added to the state > >> store > >>>>>> context? > >>>>>> > >>>>>> 3. > >>>>>> Do we need all the isolation level definitions? I think it is good > to > >>>>>> know the guarantees of the transactionality of the state store. > >>>>>> However, currently, Streams guarantees that there will only be one > >>>>>> transaction that writes to the state store. Only the stream thread > >> that > >>>>>> executes the active task that owns the state store will write to the > >>>>>> state store. I think it should be enough to know if the state store > is > >>>>>> transactional or not. So my proposal would be to just add a method > on > >>>>>> the state store interface the returns if a state store is > >> transactional > >>>>>> or not by returning a boolean or an enum. > >>>>>> > >>>>>> 4. > >>>>>> I am wondering why AbstractTransaction and > AbstractTransactionalStore > >>>>>> are part of the KIP. They look like implementation details that > should > >>>>>> not be exposed in the public API. > >>>>>> > >>>>>> 5. > >>>>>> Why does StateStore#approximateNumUncommittedBytes() return an > >>>>>> approximate number of bytes? > >>>>>> > >>>>>> 6. > >>>>>> RocksDB is just one implementation of the state stores in Streams. > >>>>>> However, the issues regarding OOM errors might also apply to other > >>>>>> custom implementations. So in the KIP I would extract that part from > >>>>>> section "RocksDB Transaction". I would also move section "RocksDB > >>>>>> Transaction" to the end of section "Proposed Changes" and handle it > as > >>>>>> an example implementation for a state store. > >>>>>> > >>>>>> 7. > >>>>>> Should statestore.uncommitted.max.bytes only limit the uncommitted > >> bytes > >>>>>> or the uncommitted bytes that reside in memory? In future, other > >>>>>> transactional state store implementations might implement a buffer > for > >>>>>> uncommitted records that are able to spill records on disk. I think > >>>>>> statestore.uncommitted.max.bytes needs to limit the uncommitted > bytes > >>>>>> irrespective if they reside in memory or disk. Since Streams will > use > >>>>>> this config to decide if it needs to trigger a commit, state store > >>>>>> implementations that can spill to disk will never be able to spill > to > >>>>>> disk. You would only need to change the doc of the config, if you > >> agree > >>>>>> with me. > >>>>>> > >>>>>> 8. > >>>>>> Section "Transaction Management" about the wrappers is rather a > >>>>>> implementation detail that should not be in the KIP. > >>>>>> > >>>>>> 9. > >>>>>> Could you add a section that describes how failover will work with > the > >>>>>> transactional state stores? I think section "Error handling" is > >> already > >>>>>> a good start. > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 15.05.23 11:04, Nick Telford wrote: > >>>>>>> Hi everyone, > >>>>>>> > >>>>>>> Quick update: I've added a new section to the KIP: "Offsets for > >>>> Consumer > >>>>>>> Rebalances", that outlines my solution to the problem that > >>>>>>> StreamsPartitionAssignor needs to read StateStore offsets even if > >>>> they're > >>>>>>> not currently open. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Nick > >>>>>>> > >>>>>>> On Wed, 3 May 2023 at 11:34, Nick Telford <nick.telf...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Bruno, > >>>>>>>> > >>>>>>>> Thanks for reviewing my proposal. > >>>>>>>> > >>>>>>>> 1. > >>>>>>>> The main reason I added it was because it was easy to do. If we > see > >> no > >>>>>>>> value in it, I can remove it. > >>>>>>>> > >>>>>>>> 2. > >>>>>>>> Global StateStores can have multiple partitions in their input > >> topics > >>>>>>>> (which function as their changelogs), so they would have more than > >> one > >>>>>>>> partition. > >>>>>>>> > >>>>>>>> 3. > >>>>>>>> That's a good point. At present, the only method it adds is > >>>>>>>> isolationLevel(), which is likely not necessary outside of > >>>> StateStores. > >>>>>>>> It *does* provide slightly different guarantees in the > documentation > >>>> to > >>>>>>>> several of the methods (hence the overrides). I'm not sure if this > >> is > >>>>>>>> enough to warrant a new interface though. > >>>>>>>> I think the question that remains is whether this interface makes > it > >>>>>>>> easier to implement custom transactional StateStores than if we > were > >>>> to > >>>>>>>> remove it? Probably not. > >>>>>>>> > >>>>>>>> 4. > >>>>>>>> The main motivation for the Atomic Checkpointing is actually > >>>>>> performance. > >>>>>>>> My team has been testing out an implementation of this KIP without > >> it, > >>>>>> and > >>>>>>>> we had problems with RocksDB doing *much* more compaction, due to > >> the > >>>>>>>> significantly increased flush rate. It was enough of a problem > that > >>>> (for > >>>>>>>> the time being), we had to revert back to Kafka Streams proper. > >>>>>>>> I think the best way to solve this, as you say, is to keep the > >>>>>> .checkpoint > >>>>>>>> files *in addition* to the offsets being stored within the store > >>>> itself. > >>>>>>>> Essentially, when closing StateStores, we force a memtable flush, > >> then > >>>>>>>> call getCommittedOffsets and write those out to the .checkpoint > >> file. > >>>>>>>> That would ensure the metadata is available to the > >>>>>>>> StreamsPartitionAssignor for all closed stores. > >>>>>>>> If there's a crash (no clean close), then we won't be able to > >>>> guarantee > >>>>>>>> which offsets were flushed to disk by RocksDB, so we'd need to > open > >> ( > >>>>>>>> init()), read offsets, and then close() those stores. But since > this > >>>> is > >>>>>>>> the exception, and will only occur once (provided it doesn't crash > >>>> every > >>>>>>>> time!), I think the performance impact here would be acceptable. > >>>>>>>> > >>>>>>>> Thanks for the feedback, please let me know if you have any more > >>>>>> comments > >>>>>>>> or questions! > >>>>>>>> > >>>>>>>> I'm currently working on rebasing against trunk. This involves > >> adding > >>>>>>>> support for transactionality to VersionedStateStores. I will > >> probably > >>>>>> need > >>>>>>>> to revise my implementation for transactional "segmented" stores, > >> both > >>>>>> to > >>>>>>>> accommodate VersionedStateStore, and to clean up some other stuff. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Nick > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Nick, > >>>>>>>>> > >>>>>>>>> Thanks for the updates! > >>>>>>>>> > >>>>>>>>> I have a couple of questions/comments. > >>>>>>>>> > >>>>>>>>> 1. > >>>>>>>>> Why do you propose a configuration that involves max. bytes and > >> max. > >>>>>>>>> reords? I think we are mainly concerned about memory consumption > >>>>>> because > >>>>>>>>> we want to limit the off-heap memory used. I cannot think of a > case > >>>>>>>>> where one would want to set the max. number of records. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 2. > >>>>>>>>> Why does > >>>>>>>>> > >>>>>>>>> default void commit(final Map<TopicPartition, Long> > >>>>>> changelogOffsets) { > >>>>>>>>> flush(); > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> take a map of partitions to changelog offsets? > >>>>>>>>> The mapping between state stores to partitions is a 1:1 > >> relationship. > >>>>>>>>> Passing in a single changelog offset should suffice. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 3. > >>>>>>>>> Why do we need the Transaction interface? It should be possible > to > >>>> hide > >>>>>>>>> beginning and committing a transactions withing the state store > >>>>>>>>> implementation, so that from outside the state store, it does not > >>>>>> matter > >>>>>>>>> whether the state store is transactional or not. What would be > the > >>>>>>>>> advantage of using the Transaction interface? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 4. > >>>>>>>>> Regarding checkpointing offsets, I think we should keep the > >>>> checkpoint > >>>>>>>>> file in any case for the reason you mentioned about rebalancing. > >> Even > >>>>>> if > >>>>>>>>> that would not be an issue, I would propose to move the change to > >>>>>> offset > >>>>>>>>> management to a new KIP and to not add more complexity than > needed > >> to > >>>>>>>>> this one. I would not be too concerned about the consistency > >>>> violation > >>>>>>>>> you mention. As far as I understand, with transactional state > >> stores > >>>>>>>>> Streams would write the checkpoint file during every commit even > >>>> under > >>>>>>>>> EOS. In the failure case you describe, Streams would restore the > >>>> state > >>>>>>>>> stores from the offsets found in the checkpoint file written > during > >>>> the > >>>>>>>>> penultimate commit instead of during the last commit. Basically, > >>>>>> Streams > >>>>>>>>> would overwrite the records written to the state store between > the > >>>> last > >>>>>>>>> two commits with the same records read from the changelogs. > While I > >>>>>>>>> understand that this is wasteful, it is -- at the same time -- > >>>>>>>>> acceptable and most importantly it does not break EOS. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Bruno > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 27.04.23 12:34, Nick Telford wrote: > >>>>>>>>>> Hi everyone, > >>>>>>>>>> > >>>>>>>>>> I find myself (again) considering removing the offset management > >>>> from > >>>>>>>>>> StateStores, and keeping the old checkpoint file system. The > >> reason > >>>> is > >>>>>>>>> that > >>>>>>>>>> the StreamPartitionAssignor directly reads checkpoint files in > >> order > >>>>>> to > >>>>>>>>>> determine which instance has the most up-to-date copy of the > local > >>>>>>>>> state. > >>>>>>>>>> If we move offsets into the StateStore itself, then we will need > >> to > >>>>>>>>> open, > >>>>>>>>>> initialize, read offsets and then close each StateStore (that is > >> not > >>>>>>>>>> already assigned and open) for which we have *any* local state, > on > >>>>>> every > >>>>>>>>>> rebalance. > >>>>>>>>>> > >>>>>>>>>> Generally, I don't think there are many "orphan" stores like > this > >>>>>>>>> sitting > >>>>>>>>>> around on most instances, but even a few would introduce > >> additional > >>>>>>>>> latency > >>>>>>>>>> to an already somewhat lengthy rebalance procedure. > >>>>>>>>>> > >>>>>>>>>> I'm leaning towards Colt's (Slack) suggestion of just keeping > >> things > >>>>>> in > >>>>>>>>> the > >>>>>>>>>> checkpoint file(s) for now, and not worrying about the race. The > >>>>>>>>> downside > >>>>>>>>>> is that we wouldn't be able to remove the explicit RocksDB flush > >>>>>>>>> on-commit, > >>>>>>>>>> which likely hurts performance. > >>>>>>>>>> > >>>>>>>>>> If anyone has any thoughts or ideas on this subject, I would > >>>>>> appreciate > >>>>>>>>> it! > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Nick > >>>>>>>>>> > >>>>>>>>>> On Wed, 19 Apr 2023 at 15:05, Nick Telford < > >> nick.telf...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Colt, > >>>>>>>>>>> > >>>>>>>>>>> The issue is that if there's a crash between 2 and 3, then you > >>>> still > >>>>>>>>> end > >>>>>>>>>>> up with inconsistent data in RocksDB. The only way to guarantee > >>>> that > >>>>>>>>> your > >>>>>>>>>>> checkpoint offsets and locally stored data are consistent with > >> each > >>>>>>>>> other > >>>>>>>>>>> are to atomically commit them, which can be achieved by having > >> the > >>>>>>>>> offsets > >>>>>>>>>>> stored in RocksDB. > >>>>>>>>>>> > >>>>>>>>>>> The offsets column family is likely to be extremely small (one > >>>>>>>>>>> per-changelog partition + one per Topology input partition for > >>>>>> regular > >>>>>>>>>>> stores, one per input partition for global stores). So the > >> overhead > >>>>>>>>> will be > >>>>>>>>>>> minimal. > >>>>>>>>>>> > >>>>>>>>>>> A major benefit of doing this is that we can remove the > explicit > >>>>>> calls > >>>>>>>>> to > >>>>>>>>>>> db.flush(), which forcibly flushes memtables to disk on-commit. > >> It > >>>>>>>>> turns > >>>>>>>>>>> out, RocksDB memtable flushes are largely dictated by Kafka > >> Streams > >>>>>>>>>>> commits, *not* RocksDB configuration, which could be a major > >> source > >>>>>> of > >>>>>>>>>>> confusion. Atomic checkpointing makes it safe to remove these > >>>>>> explicit > >>>>>>>>>>> flushes, because it no longer matters exactly when RocksDB > >> flushes > >>>>>>>>> data to > >>>>>>>>>>> disk; since the data and corresponding checkpoint offsets will > >>>> always > >>>>>>>>> be > >>>>>>>>>>> flushed together, the local store is always in a consistent > >> state, > >>>>>> and > >>>>>>>>>>> on-restart, it can always safely resume restoration from the > >>>> on-disk > >>>>>>>>>>> offsets, restoring the small amount of data that hadn't been > >>>> flushed > >>>>>>>>> when > >>>>>>>>>>> the app exited/crashed. > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Nick > >>>>>>>>>>> > >>>>>>>>>>> On Wed, 19 Apr 2023 at 14:35, Colt McNealy < > c...@littlehorse.io> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Nick, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for your reply. Ack to A) and B). > >>>>>>>>>>>> > >>>>>>>>>>>> For item C), I see what you're referring to. Your proposed > >>>> solution > >>>>>>>>> will > >>>>>>>>>>>> work, so no need to change it. What I was suggesting was that > it > >>>>>>>>> might be > >>>>>>>>>>>> possible to achieve this with only one column family. So long > >> as: > >>>>>>>>>>>> > >>>>>>>>>>>> - No uncommitted records (i.e. not committed to the > >>>> changelog) > >>>>>> are > >>>>>>>>>>>> *committed* to the state store, AND > >>>>>>>>>>>> - The Checkpoint offset (which refers to the changelog > >>>> topic) > >>>>>> is > >>>>>>>>> less > >>>>>>>>>>>> than or equal to the last written changelog offset in > >>>> rocksdb > >>>>>>>>>>>> > >>>>>>>>>>>> I don't see the need to do the full restoration from scratch. > My > >>>>>>>>>>>> understanding was that prior to 844/892, full restorations > were > >>>>>>>>> required > >>>>>>>>>>>> because there could be uncommitted records written to RocksDB; > >>>>>>>>> however, > >>>>>>>>>>>> given your use of RocksDB transactions, that can be avoided > with > >>>> the > >>>>>>>>>>>> pattern of 1) commit Kafka transaction, 2) commit RocksDB > >>>>>>>>> transaction, 3) > >>>>>>>>>>>> update offset in checkpoint file. > >>>>>>>>>>>> > >>>>>>>>>>>> Anyways, your proposed solution works equivalently and I don't > >>>>>> believe > >>>>>>>>>>>> there is much overhead to an additional column family in > >> RocksDB. > >>>>>>>>> Perhaps > >>>>>>>>>>>> it may even perform better than making separate writes to the > >>>>>>>>> checkpoint > >>>>>>>>>>>> file. > >>>>>>>>>>>> > >>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, Apr 19, 2023 at 5:53 AM Nick Telford < > >>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>> > >>>>>>>>>>>>> A. I've done my best to de-couple the StateStore stuff from > the > >>>>>> rest > >>>>>>>>> of > >>>>>>>>>>>> the > >>>>>>>>>>>>> Streams engine. The fact that there will be only one ongoing > >>>>>> (write) > >>>>>>>>>>>>> transaction at a time is not guaranteed by any API, and is > >> just a > >>>>>>>>>>>>> consequence of the way Streams operates. To that end, I tried > >> to > >>>>>>>>> ensure > >>>>>>>>>>>> the > >>>>>>>>>>>>> documentation and guarantees provided by the new APIs are > >>>>>>>>> independent of > >>>>>>>>>>>>> this incidental behaviour. In practice, you're right, this > >>>>>>>>> essentially > >>>>>>>>>>>>> refers to "interactive queries", which are technically "read > >>>>>>>>>>>> transactions", > >>>>>>>>>>>>> even if they don't actually use the transaction API to > isolate > >>>>>>>>>>>> themselves. > >>>>>>>>>>>>> > >>>>>>>>>>>>> B. Yes, although not ideal. This is for backwards > >> compatibility, > >>>>>>>>>>>> because: > >>>>>>>>>>>>> 1) Existing custom StateStore implementations will > >>>> implement > >>>>>>>>>>>> flush(), > >>>>>>>>>>>>> and not commit(), but the Streams engine now calls commit(), > so > >>>>>> those > >>>>>>>>>>>> calls > >>>>>>>>>>>>> need to be forwarded to flush() for these legacy stores. > >>>>>>>>>>>>> 2) Existing StateStore *users*, i.e. outside of the > >>>> Streams > >>>>>>>>> engine > >>>>>>>>>>>>> itself, may depend on explicitly calling flush(), so for > these > >>>>>> cases, > >>>>>>>>>>>>> flush() needs to be redirected to call commit(). > >>>>>>>>>>>>> If anyone has a better way to guarantee compatibility without > >>>>>>>>>>>> introducing > >>>>>>>>>>>>> this potential recursion loop, I'm open to changes! > >>>>>>>>>>>>> > >>>>>>>>>>>>> C. This is described in the "Atomic Checkpointing" section. > >>>> Offsets > >>>>>>>>> are > >>>>>>>>>>>>> stored in a separate RocksDB column family, which is > guaranteed > >>>> to > >>>>>> be > >>>>>>>>>>>>> atomically flushed to disk with all other column families. > The > >>>>>> issue > >>>>>>>>> of > >>>>>>>>>>>>> checkpoints being written to disk after commit causing > >>>>>> inconsistency > >>>>>>>>> if > >>>>>>>>>>>> it > >>>>>>>>>>>>> crashes in between is the reason why, under EOS, checkpoint > >> files > >>>>>> are > >>>>>>>>>>>> only > >>>>>>>>>>>>> written on clean shutdown. This is one of the major causes of > >>>> "full > >>>>>>>>>>>>> restorations", so moving the offsets into a place where they > >> can > >>>> be > >>>>>>>>>>>>> guaranteed to be atomically written with the data they > >> checkpoint > >>>>>>>>>>>> allows us > >>>>>>>>>>>>> to write the checkpoint offsets *on every commit*, not just > on > >>>>>> clean > >>>>>>>>>>>>> shutdown. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards, > >>>>>>>>>>>>> Nick > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, 18 Apr 2023 at 15:39, Colt McNealy < > >> c...@littlehorse.io> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you for continuing this work. I have a few minor > >>>> clarifying > >>>>>>>>>>>>>> questions. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> A) "Records written to any transaction are visible to all > >> other > >>>>>>>>>>>>>> transactions immediately." I am confused here—I thought > there > >>>>>> could > >>>>>>>>>>>> only > >>>>>>>>>>>>> be > >>>>>>>>>>>>>> one transaction going on at a time for a given state store > >> given > >>>>>> the > >>>>>>>>>>>>>> threading model for processing records on a Task. Do you > mean > >>>>>>>>>>>> Interactive > >>>>>>>>>>>>>> Queries by "other transactions"? (If so, then everything > makes > >>>>>>>>> sense—I > >>>>>>>>>>>>>> thought that since IQ were read-only then they didn't count > as > >>>>>>>>>>>>>> transactions). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> B) Is it intentional that the default implementations of the > >>>>>> flush() > >>>>>>>>>>>> and > >>>>>>>>>>>>>> commit() methods in the StateStore class refer to each other > >> in > >>>>>> some > >>>>>>>>>>>> sort > >>>>>>>>>>>>>> of unbounded recursion? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> C) How will the getCommittedOffset() method work? At first I > >>>>>> thought > >>>>>>>>>>>> the > >>>>>>>>>>>>>> way to do it would be using a special key in the RocksDB > store > >>>> to > >>>>>>>>>>>> store > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> offset, and committing that with the transaction. But upon > >>>> second > >>>>>>>>>>>>> thought, > >>>>>>>>>>>>>> since restoration from the changelog is an idempotent > >>>> procedure, I > >>>>>>>>>>>> think > >>>>>>>>>>>>> it > >>>>>>>>>>>>>> would be fine to 1) commit the RocksDB transaction and then > 2) > >>>>>> write > >>>>>>>>>>>> the > >>>>>>>>>>>>>> offset to disk in a checkpoint file. If there is a crash > >> between > >>>>>> 1) > >>>>>>>>>>>> and > >>>>>>>>>>>>> 2), > >>>>>>>>>>>>>> I think the only downside is now we replay a few more > records > >>>> (at > >>>>>> a > >>>>>>>>>>>> cost > >>>>>>>>>>>>> of > >>>>>>>>>>>>>> <100ms). Am I missing something there? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Other than that, everything makes sense to me. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, Apr 18, 2023 at 3:59 AM Nick Telford < > >>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I've updated the KIP to reflect the latest version of the > >>>> design: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> There are several changes in there that reflect feedback > from > >>>>>> this > >>>>>>>>>>>>>> thread, > >>>>>>>>>>>>>>> and there's a new section and a bunch of interface changes > >>>>>> relating > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> Atomic Checkpointing, which is the final piece of the > puzzle > >> to > >>>>>>>>>>>> making > >>>>>>>>>>>>>>> everything robust. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Let me know what you think! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, 3 Jan 2023 at 11:33, Nick Telford < > >>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Lucas, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for looking over my KIP. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> A) The bound is per-instance, not per-Task. This was a > typo > >> in > >>>>>> the > >>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>> that I've now corrected. It was originally per-Task, but I > >>>>>>>>>>>> changed it > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> per-instance for exactly the reason you highlighted. > >>>>>>>>>>>>>>>> B) It's worth noting that transactionality is only enabled > >>>> under > >>>>>>>>>>>> EOS, > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> in the default mode of operation (ALOS), there should be > no > >>>>>>>>>>>> change in > >>>>>>>>>>>>>>>> behavior at all. I think, under EOS, we can mitigate the > >>>> impact > >>>>>> on > >>>>>>>>>>>>>> users > >>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>> sufficiently low default values for the memory bound > >>>>>>>>>>>> configuration. I > >>>>>>>>>>>>>>>> understand your hesitation to include a significant change > >> of > >>>>>>>>>>>>>> behaviour, > >>>>>>>>>>>>>>>> especially in a minor release, but I suspect that most > users > >>>>>> will > >>>>>>>>>>>>>> prefer > >>>>>>>>>>>>>>>> the memory impact (under EOS) to the existing behaviour of > >>>>>>>>>>>> frequent > >>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>> restorations! If this is a problem, the changes can wait > >> until > >>>>>> the > >>>>>>>>>>>>> next > >>>>>>>>>>>>>>>> major release. I'll be running a patched version of > streams > >> in > >>>>>>>>>>>>>> production > >>>>>>>>>>>>>>>> with these changes as soon as they're ready, so it won't > >>>> disrupt > >>>>>>>>>>>> me > >>>>>>>>>>>>> :-D > >>>>>>>>>>>>>>>> C) The main purpose of this sentence was just to note that > >>>> some > >>>>>>>>>>>>> changes > >>>>>>>>>>>>>>>> will need to be made to the way Segments are handled in > >> order > >>>> to > >>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>> they also benefit from transactions. At the time I wrote > >> it, I > >>>>>>>>>>>> hadn't > >>>>>>>>>>>>>>>> figured out the specific changes necessary, so it was > >>>>>> deliberately > >>>>>>>>>>>>>> vague. > >>>>>>>>>>>>>>>> This is the one outstanding problem I'm currently working > >> on, > >>>>>> and > >>>>>>>>>>>>> I'll > >>>>>>>>>>>>>>>> update this section with more detail once I have figured > out > >>>> the > >>>>>>>>>>>>> exact > >>>>>>>>>>>>>>>> changes required. > >>>>>>>>>>>>>>>> D) newTransaction() provides the necessary isolation > >>>> guarantees. > >>>>>>>>>>>>> While > >>>>>>>>>>>>>>>> the RocksDB implementation of transactions doesn't > >> technically > >>>>>>>>>>>> *need* > >>>>>>>>>>>>>>>> read-only users to call newTransaction(), other > >>>> implementations > >>>>>>>>>>>>> (e.g. a > >>>>>>>>>>>>>>>> hypothetical PostgresStore) may require it. Calling > >>>>>>>>>>>> newTransaction() > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>> no transaction is necessary is essentially free, as it > will > >>>> just > >>>>>>>>>>>>> return > >>>>>>>>>>>>>>>> this. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I didn't do any profiling of the KIP-844 PoC, but I think > it > >>>>>>>>>>>> should > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> fairly obvious where the performance problems stem from: > >>>> writes > >>>>>>>>>>>> under > >>>>>>>>>>>>>>>> KIP-844 require 3 extra memory-copies: 1 to encode it with > >> the > >>>>>>>>>>>>>>>> tombstone/record flag, 1 to decode it from the > >>>> tombstone/record > >>>>>>>>>>>> flag, > >>>>>>>>>>>>>>> and 1 > >>>>>>>>>>>>>>>> to copy the record from the "temporary" store to the > "main" > >>>>>> store, > >>>>>>>>>>>>> when > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> transaction commits. The different approach taken by > KIP-869 > >>>>>>>>>>>> should > >>>>>>>>>>>>>>> perform > >>>>>>>>>>>>>>>> much better, as it avoids all these copies, and may > actually > >>>>>>>>>>>> perform > >>>>>>>>>>>>>>>> slightly better than trunk, due to batched writes in > RocksDB > >>>>>>>>>>>>> performing > >>>>>>>>>>>>>>>> better than non-batched writes.[1] > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>> > >>>>>> > >> https://github.com/adamretter/rocksjava-write-methods-benchmark#results > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy < > >>>>>>>>>>>> lbruts...@confluent.io > >>>>>>>>>>>>>>> .invalid> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Nick, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'm just starting to read up on the whole discussion > about > >>>>>>>>>>>> KIP-892 > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> KIP-844. Thanks a lot for your work on this, I do think > >>>>>>>>>>>>>>>>> `WriteBatchWithIndex` may be the way to go here. I do > have > >>>> some > >>>>>>>>>>>>>>>>> questions about the latest draft. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> A) If I understand correctly, you propose to put a > >> bound > >>>> on > >>>>>> the > >>>>>>>>>>>>>>>>> (native) memory consumed by each task. However, I wonder > if > >>>>>> this > >>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> sufficient if we have temporary imbalances in the > cluster. > >>>> For > >>>>>>>>>>>>>>>>> example, depending on the timing of rebalances during a > >>>> cluster > >>>>>>>>>>>>>>>>> restart, it could happen that a single streams node is > >>>>>> assigned a > >>>>>>>>>>>>> lot > >>>>>>>>>>>>>>>>> more tasks than expected. With your proposed change, this > >>>> would > >>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>> that the memory required by this one node could be a > >> multiple > >>>>>> of > >>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>> is required during normal operation. I wonder if it > >> wouldn't > >>>> be > >>>>>>>>>>>>> safer > >>>>>>>>>>>>>>>>> to put a global bound on the memory use, across all > tasks. > >>>>>>>>>>>>>>>>> B) Generally, the memory concerns still give me the > >>>> feeling > >>>>>>>>> that > >>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>> should not be enabled by default for all users in a minor > >>>>>>>>>>>> release. > >>>>>>>>>>>>>>>>> C) In section "Transaction Management": the > sentence > >> "A > >>>>>> similar > >>>>>>>>>>>>>>>>> analogue will be created to automatically manage > `Segment` > >>>>>>>>>>>>>>>>> transactions.". Maybe this is just me lacking some > >>>> background, > >>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>> do not understand this, it would be great if you could > >>>> clarify > >>>>>>>>>>>> what > >>>>>>>>>>>>>>>>> you mean here. > >>>>>>>>>>>>>>>>> D) Could you please clarify why IQ has to call > >>>>>>>>> newTransaction(), > >>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> it's read-only. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> And one last thing not strictly related to your KIP: if > >> there > >>>>>> is > >>>>>>>>>>>> an > >>>>>>>>>>>>>>>>> easy way for you to find out why the KIP-844 PoC is 20x > >>>> slower > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>>>> by providing a flame graph), that would be quite > >> interesting. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Thu, Dec 22, 2022 at 8:30 PM Nick Telford < > >>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I've updated the KIP with a more detailed design, which > >>>>>>>>>>>> reflects > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> implementation I've been working on: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> This new design should address the outstanding points > >>>> already > >>>>>>>>>>>> made > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> thread. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Please let me know if there are areas that are unclear > or > >>>> need > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>> clarification. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I have a (nearly) working implementation. I'm confident > >> that > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> remaining > >>>>>>>>>>>>>>>>>> work (making Segments behave) will not impact the > >> documented > >>>>>>>>>>>>> design. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Tue, 6 Dec 2022 at 19:24, Colt McNealy < > >>>>>> c...@littlehorse.io > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thank you for the reply; that makes sense. I was hoping > >>>> that, > >>>>>>>>>>>>>> since > >>>>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>>> uncommitted records from IQ in EOS isn't part of the > >>>>>>>>>>>> documented > >>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>>> you *wouldn't* have to wait for the next major release > to > >>>>>>>>>>>> make > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>> change; > >>>>>>>>>>>>>>>>>>> but given that it would be considered a major change, I > >>>> like > >>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>>>>> the best. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Wishing you a speedy recovery and happy coding! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford < > >>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 10: Yes, I agree it's not ideal. I originally intended > >> to > >>>>>>>>>>>> try > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> keep the > >>>>>>>>>>>>>>>>>>>> behaviour unchanged as much as possible, otherwise > we'd > >>>>>>>>>>>> have > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> wait for > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> major version release to land these changes. > >>>>>>>>>>>>>>>>>>>> 20: Good point, ALOS doesn't need the same level of > >>>>>>>>>>>> guarantee, > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> typically longer commit intervals would be problematic > >>>> when > >>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>> "committed" records. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've been away for 5 days recovering from minor > surgery, > >>>>>>>>>>>> but I > >>>>>>>>>>>>>>>>> spent a > >>>>>>>>>>>>>>>>>>>> considerable amount of that time working through ideas > >> for > >>>>>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>>> solutions in my head. I think your suggestion of > keeping > >>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>> as-is, but > >>>>>>>>>>>>>>>>>>>> buffering writes for EOS is the right path forwards, > >>>>>>>>>>>> although > >>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>> have a > >>>>>>>>>>>>>>>>>>>> solution that both expands on this, and provides for > >> some > >>>>>>>>>>>> more > >>>>>>>>>>>>>>>>> formal > >>>>>>>>>>>>>>>>>>>> guarantees. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Essentially, adding support to KeyValueStores for > >>>>>>>>>>>>>> "Transactions", > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> clearly defined IsolationLevels. Using "Read > Committed" > >>>>>>>>>>>> when > >>>>>>>>>>>>>> under > >>>>>>>>>>>>>>>>> EOS, > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> "Read Uncommitted" under ALOS. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The nice thing about this approach is that it gives us > >>>> much > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>> clearly > >>>>>>>>>>>>>>>>>>>> defined isolation behaviour that can be properly > >>>>>>>>>>>> documented to > >>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>>>> know what to expect. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'm still working out the kinks in the design, and > will > >>>>>>>>>>>> update > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>> I have something. The main struggle is trying to > >> implement > >>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>>>>>> making any major changes to the existing interfaces or > >>>>>>>>>>>>> breaking > >>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>> implementations, because currently everything expects > to > >>>>>>>>>>>>> operate > >>>>>>>>>>>>>>>>> directly > >>>>>>>>>>>>>>>>>>>> on a StateStore, and not a Transaction of that store. > I > >>>>>>>>>>>> think > >>>>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>> getting > >>>>>>>>>>>>>>>>>>>> close, although sadly I won't be able to progress much > >>>>>>>>>>>> until > >>>>>>>>>>>>>> next > >>>>>>>>>>>>>>>>> week > >>>>>>>>>>>>>>>>>>> due > >>>>>>>>>>>>>>>>>>>> to some work commitments. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Thu, 1 Dec 2022 at 00:01, Colt McNealy < > >>>>>>>>>>>>> c...@littlehorse.io> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Nick, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thank you for the explanation, and also for the > updated > >>>>>>>>>>>>> KIP. I > >>>>>>>>>>>>>>> am > >>>>>>>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>>>>>> eager for this improvement to be released as it would > >>>>>>>>>>>>> greatly > >>>>>>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> operational difficulties of EOS streams apps. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Two questions: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 10) > >>>>>>>>>>>>>>>>>>>>>> When reading records, we will use the > >>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex#getFromBatchAndDB > >>>>>>>>>>>>>>>>>>>>> and WriteBatchWithIndex#newIteratorWithBase > >>>> utilities in > >>>>>>>>>>>>>> order > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> ensure > >>>>>>>>>>>>>>>>>>>>> that uncommitted writes are available to query. > >>>>>>>>>>>>>>>>>>>>> Why do extra work to enable the reading of > uncommitted > >>>>>>>>>>>>> writes > >>>>>>>>>>>>>>>>> during > >>>>>>>>>>>>>>>>>>> IQ? > >>>>>>>>>>>>>>>>>>>>> Code complexity aside, reading uncommitted writes is, > >> in > >>>>>>>>>>>> my > >>>>>>>>>>>>>>>>> opinion, a > >>>>>>>>>>>>>>>>>>>>> minor flaw in EOS IQ; it would be very nice to have > the > >>>>>>>>>>>>>>> guarantee > >>>>>>>>>>>>>>>>> that, > >>>>>>>>>>>>>>>>>>>>> with EOS, IQ only reads committed records. In order > to > >>>>>>>>>>>> avoid > >>>>>>>>>>>>>>> dirty > >>>>>>>>>>>>>>>>>>> reads, > >>>>>>>>>>>>>>>>>>>>> one currently must query a standby replica (but this > >>>>>>>>>>>> still > >>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>>>>>>>> guarantee monotonic reads). > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> 20) Is it also necessary to enable this optimization > on > >>>>>>>>>>>> ALOS > >>>>>>>>>>>>>>>>> stores? > >>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>> motivation of KIP-844 was mainly to reduce the need > to > >>>>>>>>>>>>> restore > >>>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>> scratch on unclean EOS shutdowns; with ALOS it was > >>>>>>>>>>>>> acceptable > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> accept > >>>>>>>>>>>>>>>>>>>>> that there may have been uncommitted writes on disk. > >> On a > >>>>>>>>>>>>> side > >>>>>>>>>>>>>>>>> note, if > >>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>> enable this type of store on ALOS processors, the > >>>>>>>>>>>> community > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>> definitely want to enable queries on dirty reads; > >>>>>>>>>>>> otherwise > >>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>> have to wait 30 seconds (default) to see an update. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thank you for doing this fantastic work! > >>>>>>>>>>>>>>>>>>>>> Colt McNealy > >>>>>>>>>>>>>>>>>>>>> *Founder, LittleHorse.io* > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Wed, Nov 30, 2022 at 10:44 AM Nick Telford < > >>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I've drastically reduced the scope of this KIP to no > >>>>>>>>>>>>> longer > >>>>>>>>>>>>>>>>> include > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> StateStore management of checkpointing. This can be > >>>>>>>>>>>> added > >>>>>>>>>>>>>> as a > >>>>>>>>>>>>>>>>> KIP > >>>>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>> to further optimize the consistency and performance > of > >>>>>>>>>>>>> state > >>>>>>>>>>>>>>>>> stores. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I've also added a section discussing some of the > >>>>>>>>>>>> concerns > >>>>>>>>>>>>>>> around > >>>>>>>>>>>>>>>>>>>>>> concurrency, especially in the presence of > Iterators. > >>>>>>>>>>>> I'm > >>>>>>>>>>>>>>>>> thinking of > >>>>>>>>>>>>>>>>>>>>>> wrapping WriteBatchWithIndex with a > reference-counting > >>>>>>>>>>>>>>>>> copy-on-write > >>>>>>>>>>>>>>>>>>>>>> implementation (that only makes a copy if there's an > >>>>>>>>>>>>> active > >>>>>>>>>>>>>>>>>>> iterator), > >>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>> I'm open to suggestions. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>> Nick > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Mon, 28 Nov 2022 at 16:36, Nick Telford < > >>>>>>>>>>>>>>>>> nick.telf...@gmail.com> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Colt, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I didn't do any profiling, but the 844 > >>>>>>>>>>>> implementation: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> - Writes uncommitted records to a temporary > >>>>>>>>>>>> RocksDB > >>>>>>>>>>>>>>>>> instance > >>>>>>>>>>>>>>>>>>>>>>> - Since tombstones need to be flagged, > all > >>>>>>>>>>>> record > >>>>>>>>>>>>>>>>> values are > >>>>>>>>>>>>>>>>>>>>>>> prefixed with a value/tombstone marker. > >> This > >>>>>>>>>>>>>>>>> necessitates a > >>>>>>>>>>>>>>>>>>>>> memory > >>>>>>>>>>>>>>>>>>>>>> copy. > >>>>>>>>>>>>>>>>>>>>>>> - On-commit, iterates all records in this > >>>>>>>>>>>> temporary > >>>>>>>>>>>>>>>>> instance and > >>>>>>>>>>>>>>>>>>>>>>> writes them to the main RocksDB store. > >>>>>>>>>>>>>>>>>>>>>>> - While iterating, the value/tombston