Hi Alex, Excellent proposal, I'm very keen to see this land!
Would it be useful to permit configuring the type of store used for uncommitted offsets on a store-by-store basis? This way, users could choose whether to use, e.g. an in-memory store or RocksDB, potentially reducing the overheads associated with RocksDb for smaller stores, but without the memory pressure issues? I suspect that in most cases, the number of uncommitted records will be very small, because the default commit interval is 100ms. Regards, Nick On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Alex, > > Thanks for the updated KIP, I looked over it and browsed the WIP and just > have a couple meta thoughts: > > 1) About the param passed into the `recover()` function: it seems to me > that the semantics of "recover(offset)" is: recover this state to a > transaction boundary which is at least the passed-in offset. And the only > possibility that the returned offset is different than the passed-in offset > is that if the previous failure happens after we've done all the commit > procedures except writing the new checkpoint, in which case the returned > offset would be larger than the passed-in offset. Otherwise it should > always be equal to the passed-in offset, is that right? > > 2) It seems the only use for the "transactional()" function is to determine > if we can update the checkpoint file while in EOS. But the purpose of the > checkpoint file's offsets is just to tell "the local state's current > snapshot's progress is at least the indicated offsets" anyways, and with > this KIP maybe we would just do: > > a) when in ALOS, upon failover: we set the starting offset as > checkpointed-offset, then restore() from changelog till the end-offset. > This way we may restore some records twice. > b) when in EOS, upon failover: we first call recover(checkpointed-offset), > then set the starting offset as the returned offset (which may be larger > than checkpointed-offset), then restore until the end-offset. > > So why not also: > c) we let the `commit()` function to also return an offset, which indicates > "checkpointable offsets". > d) for existing non-transactional stores, we just have a default > implementation of "commit()" which is simply a flush, and returns a > sentinel value like -1. Then later if we get checkpointable offsets -1, we > do not write the checkpoint. Upon clean shutting down we can just > checkpoint regardless of the returned value from "commit". > e) for existing non-transactional stores, we just have a default > implementation of "recover()" which is to wipe out the local store and > return offset 0 if the passed in offset is -1, otherwise if not -1 then it > indicates a clean shutdown in the last run, can this function is just a > no-op. > > In that case, we would not need the "transactional()" function anymore, > since for non-transactional stores their behaviors are still wrapped in the > `commit / recover` function pairs. > > I have not completed the thorough pass on your WIP PR, so maybe I could > come up with some more feedback later, but just let me know if my > understanding above is correct or not? > > > Guozhang > > > > > On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov > <asorokou...@confluent.io.invalid> wrote: > > > Hi, > > > > I updated the KIP with the following changes: > > * Replaced in-memory batches with the secondary-store approach as the > > default implementation to address the feedback about memory pressure as > > suggested by Sagar and Bruno. > > * Introduced StateStore#commit and StateStore#recover methods as an > > extension of the rollback idea. @Guozhang, please see the comment below > on > > why I took a slightly different approach than you suggested. > > * Removed mentions of changes to IQv1 and IQv2. Transactional state > stores > > enable reading committed in IQ, but it is really an independent feature > > that deserves its own KIP. Conflating them unnecessarily increases the > > scope for discussion, implementation, and testing in a single unit of > work. > > > > I also published a prototype - > https://github.com/apache/kafka/pull/12393 > > that implements changes described in the proposal. > > > > Regarding explicit rollback, I think it is a powerful idea that allows > > other StateStore implementations to take a different path to the > > transactional behavior rather than keep 2 state stores. Instead of > > introducing a new commit token, I suggest using a changelog offset that > > already 1:1 corresponds to the materialized state. This works nicely > > because Kafka Stream first commits an AK transaction and only then > > checkpoints the state store, so we can use the changelog offset to commit > > the state store transaction. > > > > I called the method StateStore#recover rather than StateStore#rollback > > because a state store might either roll back or forward depending on the > > specific point of the crash failure.Consider the write algorithm in Kafka > > Streams is: > > 1. write stuff to the state store > > 2. producer.sendOffsetsToTransaction(token); > producer.commitTransaction(); > > 3. flush > > 4. checkpoint > > > > Let's consider 3 cases: > > 1. If the crash failure happens between #2 and #3, the state store rolls > > back and replays the uncommitted transaction from the changelog. > > 2. If the crash failure happens during #3, the state store can roll > forward > > and finish the flush/commit. > > 3. If the crash failure happens between #3 and #4, the state store should > > do nothing during recovery and just proceed with the checkpoint. > > > > Looking forward to your feedback, > > Alexander > > > > On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov < > > asorokou...@confluent.io> wrote: > > > > > Hi, > > > > > > As a status update, I did the following changes to the KIP: > > > * replaced configuration via the top-level config with configuration > via > > > Stores factory and StoreSuppliers, > > > * added IQv2 and elaborated how readCommitted will work when the store > is > > > not transactional, > > > * removed claims about ALOS. > > > > > > I am going to be OOO in the next couple of weeks and will resume > working > > > on the proposal and responding to the discussion in this thread > starting > > > June 27. My next top priorities are: > > > 1. Prototype the rollback approach as suggested by Guozhang. > > > 2. Replace in-memory batches with the secondary-store approach as the > > > default implementation to address the feedback about memory pressure as > > > suggested by Sagar and Bruno. > > > 3. Adjust Stores methods to make transactional implementations > pluggable. > > > 4. Publish the POC for the first review. > > > > > > Best regards, > > > Alex > > > > > > On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > >> Alex, > > >> > > >> Thanks for your replies! That is very helpful. > > >> > > >> Just to broaden our discussions a bit here, I think there are some > other > > >> approaches in parallel to the idea of "enforce to only persist upon > > >> explicit flush" and I'd like to throw one here -- not really > advocating > > >> it, > > >> but just for us to compare the pros and cons: > > >> > > >> 1) We let the StateStore's `flush` function to return a token instead > of > > >> returning `void`. > > >> 2) We add another `rollback(token)` interface of StateStore which > would > > >> effectively rollback the state as indicated by the token to the > snapshot > > >> when the corresponding `flush` is called. > > >> 3) We encode the token and commit as part of > > >> `producer#sendOffsetsToTransaction`. > > >> > > >> Users could optionally implement the new functions, or they can just > not > > >> return the token at all and not implement the second function. Again, > > the > > >> APIs are just for the sake of illustration, not feeling they are the > > most > > >> natural :) > > >> > > >> Then the procedure would be: > > >> > > >> 1. the previous checkpointed offset is 100 > > >> ... > > >> 3. flush store, make sure all writes are persisted; get the returned > > token > > >> that indicates the snapshot of 200. > > >> 4. producer.sendOffsetsToTransaction(token); > > producer.commitTransaction(); > > >> 5. Update the checkpoint file (say, the new value is 200). > > >> > > >> Then if there's a failure, say between 3/4, we would get the token > from > > >> the > > >> last committed txn, and first we would do the restoration (which may > get > > >> the state to somewhere between 100 and 200), then call > > >> `store.rollback(token)` to rollback to the snapshot of offset 100. > > >> > > >> The pros is that we would then not need to enforce the state stores to > > not > > >> persist any data during the txn: for stores that may not be able to > > >> implement the `rollback` function, they can still reduce its impl to > > "not > > >> persisting any data" via this API, but for stores that can indeed > > support > > >> the rollback, their implementation may be more efficient. The cons > > though, > > >> on top of my head are 1) more complicated logic differentiating > between > > >> EOS > > >> with and without store rollback support, and ALOS, 2) encoding the > token > > >> as > > >> part of the commit offset is not ideal if it is big, 3) the recovery > > logic > > >> including the state store is also a bit more complicated. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov > > >> <asorokou...@confluent.io.invalid> wrote: > > >> > > >> > Hi Guozhang, > > >> > > > >> > But I'm still trying to clarify how it guarantees EOS, and it seems > > >> that we > > >> > > would achieve it by enforcing to not persist any data written > within > > >> this > > >> > > transaction until step 4. Is that correct? > > >> > > > >> > > > >> > This is correct. Both alternatives - in-memory WriteBatchWithIndex > and > > >> > transactionality via the secondary store guarantee EOS by not > > persisting > > >> > data in the "main" state store until it is committed in the > changelog > > >> > topic. > > >> > > > >> > Oh what I meant is not what KStream code does, but that StateStore > > impl > > >> > > classes themselves could potentially flush data to become > persisted > > >> > > asynchronously > > >> > > > >> > > > >> > Thank you for elaborating! You are correct, the underlying state > store > > >> > should not persist data until the streams app calls > StateStore#flush. > > >> There > > >> > are 2 options how a State Store implementation can guarantee that - > > >> either > > >> > keep uncommitted writes in memory or be able to roll back the > changes > > >> that > > >> > were not committed during recovery. RocksDB's WriteBatchWithIndex is > > an > > >> > implementation of the first option. A considered alternative, > > >> Transactions > > >> > via Secondary State Store for Uncommitted Changes, is the way to > > >> implement > > >> > the second option. > > >> > > > >> > As everyone correctly pointed out, keeping uncommitted data in > memory > > >> > introduces a very real risk of OOM that we will need to handle. The > > >> more I > > >> > think about it, the more I lean towards going with the Transactions > > via > > >> > Secondary Store as the way to implement transactionality as it does > > not > > >> > have that issue. > > >> > > > >> > Best, > > >> > Alex > > >> > > > >> > > > >> > On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > > >> > > Hello Alex, > > >> > > > > >> > > > we flush the cache, but not the underlying state store. > > >> > > > > >> > > You're right. The ordering I mentioned above is actually: > > >> > > > > >> > > ... > > >> > > 3. producer.sendOffsetsToTransaction(); > > producer.commitTransaction(); > > >> > > 4. flush store, make sure all writes are persisted. > > >> > > 5. Update the checkpoint file to 200. > > >> > > > > >> > > But I'm still trying to clarify how it guarantees EOS, and it > seems > > >> that > > >> > we > > >> > > would achieve it by enforcing to not persist any data written > within > > >> this > > >> > > transaction until step 4. Is that correct? > > >> > > > > >> > > > Can you please point me to the place in the codebase where we > > >> trigger > > >> > > async flush before the commit? > > >> > > > > >> > > Oh what I meant is not what KStream code does, but that StateStore > > >> impl > > >> > > classes themselves could potentially flush data to become > persisted > > >> > > asynchronously, e.g. RocksDB does that naturally out of the > control > > of > > >> > > KStream code. I think it is related to my previous question: if we > > >> think > > >> > by > > >> > > guaranteeing EOS at the state store level, we would effectively > ask > > >> the > > >> > > impl classes that "you should not persist any data until `flush` > is > > >> > called > > >> > > explicitly", is the StateStore interface the right level to > enforce > > >> such > > >> > > mechanisms, or should we just do that on top of the StateStores, > > e.g. > > >> > > during the transaction we just keep all the writes in the cache > (of > > >> > course > > >> > > we need to consider how to work around memory pressure as > previously > > >> > > mentioned), and then upon committing, we just write the cached > > records > > >> > as a > > >> > > whole into the store and then call flush. > > >> > > > > >> > > > > >> > > Guozhang > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov > > >> > > <asorokou...@confluent.io.invalid> wrote: > > >> > > > > >> > > > Hey, > > >> > > > > > >> > > > Thank you for the wealth of great suggestions and questions! I > am > > >> going > > >> > > to > > >> > > > address the feedback in batches and update the proposal async, > as > > >> it is > > >> > > > probably going to be easier for everyone. I will also write a > > >> separate > > >> > > > message after making updates to the KIP. > > >> > > > > > >> > > > @John, > > >> > > > > > >> > > > > Did you consider instead just adding the option to the > > >> > > > > RocksDB*StoreSupplier classes and the factories in Stores ? > > >> > > > > > >> > > > Thank you for suggesting that. I think that this idea is better > > than > > >> > > what I > > >> > > > came up with and will update the KIP with configuring > > >> transactionality > > >> > > via > > >> > > > the suppliers and Stores. > > >> > > > > > >> > > > what is the advantage over just doing the same thing with the > > >> > RecordCache > > >> > > > > and not introducing the WriteBatch at all? > > >> > > > > > >> > > > Can you point me to RecordCache? I can't find it in the project. > > The > > >> > > > advantage would be that WriteBatch guarantees write atomicity. > As > > >> far > > >> > as > > >> > > I > > >> > > > understood the way RecordCache works, it might leave the system > in > > >> an > > >> > > > inconsistent state during crash failure on write. > > >> > > > > > >> > > > You mentioned that a transactional store can help reduce > > >> duplication in > > >> > > the > > >> > > > > case of ALOS > > >> > > > > > >> > > > I will remove claims about ALOS from the proposal. Thank you for > > >> > > > elaborating! > > >> > > > > > >> > > > As a reminder, we have a new IQv2 mechanism now. Should we > propose > > >> any > > >> > > > > changes to IQv1 to support this transactional mechanism, > versus > > >> just > > >> > > > > proposing it for IQv2? Certainly, it seems strange only to > > >> propose a > > >> > > > change > > >> > > > > for IQv1 and not v2. > > >> > > > > > >> > > > > > >> > > > I will update the proposal with complementary API changes for > > IQv2 > > >> > > > > > >> > > > What should IQ do if I request to readCommitted on a > > >> non-transactional > > >> > > > > store? > > >> > > > > > >> > > > We can assume that non-transactional stores commit on write, so > IQ > > >> > works > > >> > > in > > >> > > > the same way with non-transactional stores regardless of the > value > > >> of > > >> > > > readCommitted. > > >> > > > > > >> > > > > > >> > > > @Guozhang, > > >> > > > > > >> > > > * If we crash between line 3 and 4, then at that time the local > > >> > > persistent > > >> > > > > store image is representing as of offset 200, but upon > recovery > > >> all > > >> > > > > changelog records from 100 to log-end-offset would be > considered > > >> as > > >> > > > aborted > > >> > > > > and not be replayed and we would restart processing from > > position > > >> > 100. > > >> > > > > Restart processing will violate EOS.I'm not sure how e.g. > > >> RocksDB's > > >> > > > > WriteBatchWithIndex would make sure that the step 4 and step 5 > > >> could > > >> > be > > >> > > > > done atomically here. > > >> > > > > > >> > > > > > >> > > > Could you please point me to the place in the codebase where a > > task > > >> > > flushes > > >> > > > the store before committing the transaction? > > >> > > > Looking at TaskExecutor ( > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167 > > >> > > > ), > > >> > > > StreamTask#prepareCommit ( > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398 > > >> > > > ), > > >> > > > and CachedStateStore ( > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34 > > >> > > > ) > > >> > > > we flush the cache, but not the underlying state store. Explicit > > >> > > > StateStore#flush happens in AbstractTask#maybeWriteCheckpoint ( > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99 > > >> > > > ). > > >> > > > Is there something I am missing here? > > >> > > > > > >> > > > Today all cached data that have not been flushed are not > committed > > >> for > > >> > > > > sure, but even flushed data to the persistent underlying store > > may > > >> > also > > >> > > > be > > >> > > > > uncommitted since flushing can be triggered asynchronously > > before > > >> the > > >> > > > > commit. > > >> > > > > > >> > > > Can you please point me to the place in the codebase where we > > >> trigger > > >> > > async > > >> > > > flush before the commit? This would certainly be a reason to > > >> introduce > > >> > a > > >> > > > dedicated StateStore#commit method. > > >> > > > > > >> > > > Thanks again for the feedback. I am going to update the KIP and > > then > > >> > > > respond to the next batch of questions and suggestions. > > >> > > > > > >> > > > Best, > > >> > > > Alex > > >> > > > > > >> > > > On Mon, May 30, 2022 at 5:13 PM Suhas Satish > > >> > > <ssat...@confluent.io.invalid > > >> > > > > > > >> > > > wrote: > > >> > > > > > >> > > > > Thanks for the KIP proposal Alex. > > >> > > > > 1. Configuration default > > >> > > > > > > >> > > > > You mention applications using streams DSL with built-in > rocksDB > > >> > state > > >> > > > > store will get transactional state stores by default when EOS > is > > >> > > enabled, > > >> > > > > but the default implementation for apps using PAPI will > fallback > > >> to > > >> > > > > non-transactional behavior. > > >> > > > > Shouldn't we have the same default behavior for both types of > > >> apps - > > >> > > DSL > > >> > > > > and PAPI? > > >> > > > > > > >> > > > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna < > > cado...@apache.org > > >> > > > >> > > > wrote: > > >> > > > > > > >> > > > > > Thanks for the PR, Alex! > > >> > > > > > > > >> > > > > > I am also glad to see this coming. > > >> > > > > > > > >> > > > > > > > >> > > > > > 1. Configuration > > >> > > > > > > > >> > > > > > I would also prefer to restrict the configuration of > > >> transactional > > >> > on > > >> > > > > > the state sore. Ideally, calling method transactional() on > the > > >> > state > > >> > > > > > store would be enough. An option on the store builder would > > >> make it > > >> > > > > > possible to turn transactionality on and off (as John > > proposed). > > >> > > > > > > > >> > > > > > > > >> > > > > > 2. Memory usage in RocksDB > > >> > > > > > > > >> > > > > > This seems to be a major issue. We do not have any guarantee > > >> that > > >> > > > > > uncommitted writes fit into memory and I guess we will never > > >> have. > > >> > > What > > >> > > > > > happens when the uncommitted writes do not fit into memory? > > Does > > >> > > > RocksDB > > >> > > > > > throw an exception? Can we handle such an exception without > > >> > crashing? > > >> > > > > > > > >> > > > > > Does the RocksDB behavior even need to be included in this > > KIP? > > >> In > > >> > > the > > >> > > > > > end it is an implementation detail. > > >> > > > > > > > >> > > > > > What we should consider - though - is a memory limit in some > > >> form. > > >> > > And > > >> > > > > > what we do when the memory limit is exceeded. > > >> > > > > > > > >> > > > > > > > >> > > > > > 3. PoC > > >> > > > > > > > >> > > > > > I agree with Guozhang that a PoC is a good idea to better > > >> > understand > > >> > > > the > > >> > > > > > devils in the details. > > >> > > > > > > > >> > > > > > > > >> > > > > > Best, > > >> > > > > > Bruno > > >> > > > > > > > >> > > > > > On 25.05.22 01:52, Guozhang Wang wrote: > > >> > > > > > > Hello Alex, > > >> > > > > > > > > >> > > > > > > Thanks for writing the proposal! Glad to see it coming. I > > >> think > > >> > > this > > >> > > > is > > >> > > > > > the > > >> > > > > > > kind of a KIP that since too many devils would be buried > in > > >> the > > >> > > > details > > >> > > > > > and > > >> > > > > > > it's better to start working on a POC, either in parallel, > > or > > >> > > before > > >> > > > we > > >> > > > > > > resume our discussion, rather than blocking any > > implementation > > >> > > until > > >> > > > we > > >> > > > > > are > > >> > > > > > > satisfied with the proposal. > > >> > > > > > > > > >> > > > > > > Just as a concrete example, I personally am still not 100% > > >> clear > > >> > > how > > >> > > > > the > > >> > > > > > > proposal would work to achieve EOS with the state stores. > > For > > >> > > > example, > > >> > > > > > the > > >> > > > > > > commit procedure today looks like this: > > >> > > > > > > > > >> > > > > > > 0: there's an existing checkpoint file indicating the > > >> changelog > > >> > > > offset > > >> > > > > of > > >> > > > > > > the local state store image is 100. Now a commit is > > triggered: > > >> > > > > > > 1. flush cache (since it contains partially processed > > >> records), > > >> > > make > > >> > > > > sure > > >> > > > > > > all records are written to the producer. > > >> > > > > > > 2. flush producer, making sure all changelog records have > > now > > >> > > acked. > > >> > > > // > > >> > > > > > > here we would get the new changelog position, say 200 > > >> > > > > > > 3. flush store, make sure all writes are persisted. > > >> > > > > > > 4. producer.sendOffsetsToTransaction(); > > >> > > producer.commitTransaction(); > > >> > > > > // > > >> > > > > > we > > >> > > > > > > would make the writes in changelog up to offset 200 > > committed > > >> > > > > > > 5. Update the checkpoint file to 200. > > >> > > > > > > > > >> > > > > > > The question about atomicity between those lines, for > > example: > > >> > > > > > > > > >> > > > > > > * If we crash between line 4 and line 5, the local > > checkpoint > > >> > file > > >> > > > > would > > >> > > > > > > stay as 100, and upon recovery we would replay the > changelog > > >> from > > >> > > 100 > > >> > > > > to > > >> > > > > > > 200. This is not ideal but does not violate EOS, since the > > >> > > changelogs > > >> > > > > are > > >> > > > > > > all overwrites anyways. > > >> > > > > > > * If we crash between line 3 and 4, then at that time the > > >> local > > >> > > > > > persistent > > >> > > > > > > store image is representing as of offset 200, but upon > > >> recovery > > >> > all > > >> > > > > > > changelog records from 100 to log-end-offset would be > > >> considered > > >> > as > > >> > > > > > aborted > > >> > > > > > > and not be replayed and we would restart processing from > > >> position > > >> > > > 100. > > >> > > > > > > Restart processing will violate EOS.I'm not sure how e.g. > > >> > RocksDB's > > >> > > > > > > WriteBatchWithIndex would make sure that the step 4 and > > step 5 > > >> > > could > > >> > > > be > > >> > > > > > > done atomically here. > > >> > > > > > > > > >> > > > > > > Originally what I was thinking when creating the JIRA > ticket > > >> is > > >> > > that > > >> > > > we > > >> > > > > > > need to let the state store to provide a transactional API > > >> like > > >> > > > "token > > >> > > > > > > commit()" used in step 4) above which returns a token, > that > > >> e.g. > > >> > in > > >> > > > our > > >> > > > > > > example above indicates offset 200, and that token would > be > > >> > written > > >> > > > as > > >> > > > > > part > > >> > > > > > > of the records in Kafka transaction in step 5). And upon > > >> recovery > > >> > > the > > >> > > > > > state > > >> > > > > > > store would have another API like "rollback(token)" where > > the > > >> > token > > >> > > > is > > >> > > > > > read > > >> > > > > > > from the latest committed txn, and be used to rollback the > > >> store > > >> > to > > >> > > > > that > > >> > > > > > > committed image. I think your proposal is different, and > it > > >> seems > > >> > > > like > > >> > > > > > > you're proposing we swap step 3) and 4) above, but the > > >> atomicity > > >> > > > issue > > >> > > > > > > still remains since now you may have the store image at > 100 > > >> but > > >> > the > > >> > > > > > > changelog is committed at 200. I'd like to learn more > about > > >> the > > >> > > > details > > >> > > > > > > on how it resolves such issues. > > >> > > > > > > > > >> > > > > > > Anyways, that's just an example to make the point that > there > > >> are > > >> > > lots > > >> > > > > of > > >> > > > > > > implementational details which would drive the public API > > >> design, > > >> > > and > > >> > > > > we > > >> > > > > > > should probably first do a POC, and come back to discuss > the > > >> KIP. > > >> > > Let > > >> > > > > me > > >> > > > > > > know what you think? > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > Guozhang > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar < > > >> > sagarmeansoc...@gmail.com> > > >> > > > > > wrote: > > >> > > > > > > > > >> > > > > > >> Hi Alexander, > > >> > > > > > >> > > >> > > > > > >> Thanks for the KIP! This seems like a great proposal. I > > have > > >> the > > >> > > > same > > >> > > > > > >> opinion as John on the Configuration part though. I think > > >> the 2 > > >> > > > level > > >> > > > > > >> config and its behaviour based on the setting/unsetting > of > > >> the > > >> > > flag > > >> > > > > > seems > > >> > > > > > >> confusing to me as well. Since the KIP seems specifically > > >> > centred > > >> > > > > around > > >> > > > > > >> RocksDB it might be better to add it at the Supplier > level > > as > > >> > John > > >> > > > > > >> suggested. > > >> > > > > > >> > > >> > > > > > >> On similar lines, this config name => > > >> > > > > > *statestore.transactional.mechanism > > >> > > > > > >> *may > > >> > > > > > >> also need rethinking as the value assigned to > > >> > > it(rocksdb_indexbatch) > > >> > > > > > >> implicitly seems to assume that rocksdb is the only > > >> statestore > > >> > > that > > >> > > > > > Kafka > > >> > > > > > >> Stream supports while that's not the case. > > >> > > > > > >> > > >> > > > > > >> Also, regarding the potential memory pressure that can be > > >> > > introduced > > >> > > > > by > > >> > > > > > >> WriteBatchIndex, do you think it might make more sense to > > >> > include > > >> > > > some > > >> > > > > > >> numbers/benchmarks on how much the memory consumption > might > > >> > > > increase? > > >> > > > > > >> > > >> > > > > > >> Lastly, the read_uncommitted flag's behaviour on IQ may > > need > > >> > more > > >> > > > > > >> elaboration. > > >> > > > > > >> > > >> > > > > > >> These points aside, as I said, this is a great proposal! > > >> > > > > > >> > > >> > > > > > >> Thanks! > > >> > > > > > >> Sagar. > > >> > > > > > >> > > >> > > > > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler < > > >> > > vvcep...@apache.org> > > >> > > > > > wrote: > > >> > > > > > >> > > >> > > > > > >>> Thanks for the KIP, Alex! > > >> > > > > > >>> > > >> > > > > > >>> I'm really happy to see your proposal. This improvement > > >> fills a > > >> > > > > > >>> long-standing gap. > > >> > > > > > >>> > > >> > > > > > >>> I have a few questions: > > >> > > > > > >>> > > >> > > > > > >>> 1. Configuration > > >> > > > > > >>> The KIP only mentions RocksDB, but of course, Streams > also > > >> > ships > > >> > > > with > > >> > > > > > an > > >> > > > > > >>> InMemory store, and users also plug in their own custom > > >> state > > >> > > > stores. > > >> > > > > > It > > >> > > > > > >> is > > >> > > > > > >>> also common to use multiple types of state stores in the > > >> same > > >> > > > > > application > > >> > > > > > >>> for different purposes. > > >> > > > > > >>> > > >> > > > > > >>> Against this backdrop, the choice to configure > > >> transactionality > > >> > > as > > >> > > > a > > >> > > > > > >>> top-level config, as well as to configure the store > > >> transaction > > >> > > > > > mechanism > > >> > > > > > >>> as a top-level config, seems a bit off. > > >> > > > > > >>> > > >> > > > > > >>> Did you consider instead just adding the option to the > > >> > > > > > >>> RocksDB*StoreSupplier classes and the factories in > Stores > > ? > > >> It > > >> > > > seems > > >> > > > > > like > > >> > > > > > >>> the desire to enable the feature by default, but with a > > >> > > > feature-flag > > >> > > > > to > > >> > > > > > >>> disable it was a factor here. However, as you pointed > out, > > >> > there > > >> > > > are > > >> > > > > > some > > >> > > > > > >>> major considerations that users should be aware of, so > > >> opt-in > > >> > > > doesn't > > >> > > > > > >> seem > > >> > > > > > >>> like a bad choice, either. You could add an Enum > argument > > to > > >> > > those > > >> > > > > > >>> factories like `RocksDBTransactionalMechanism.{NONE, > > >> > > > > > >>> > > >> > > > > > >>> Some points in favor of this approach: > > >> > > > > > >>> * Avoid "stores that don't support transactions ignore > the > > >> > > config" > > >> > > > > > >>> complexity > > >> > > > > > >>> * Users can choose how to spend their memory budget, > > making > > >> > some > > >> > > > > stores > > >> > > > > > >>> transactional and others not > > >> > > > > > >>> * When we add transactional support to in-memory stores, > > we > > >> > don't > > >> > > > > have > > >> > > > > > to > > >> > > > > > >>> figure out what to do with the mechanism config (i.e., > > what > > >> do > > >> > > you > > >> > > > > set > > >> > > > > > >> the > > >> > > > > > >>> mechanism to when there are multiple kinds of > > transactional > > >> > > stores > > >> > > > in > > >> > > > > > the > > >> > > > > > >>> topology?) > > >> > > > > > >>> > > >> > > > > > >>> 2. caching/flushing/transactions > > >> > > > > > >>> The coupling between memory usage and flushing that you > > >> > mentioned > > >> > > > is > > >> > > > > a > > >> > > > > > >> bit > > >> > > > > > >>> troubling. It also occurs to me that there seems to be > > some > > >> > > > > > relationship > > >> > > > > > >>> with the existing record cache, which is also an > in-memory > > >> > > holding > > >> > > > > area > > >> > > > > > >> for > > >> > > > > > >>> records that are not yet written to the cache and/or > store > > >> > > (albeit > > >> > > > > with > > >> > > > > > >> no > > >> > > > > > >>> particular semantics). Have you considered how all these > > >> > > components > > >> > > > > > >> should > > >> > > > > > >>> relate? For example, should a "full" WriteBatch actually > > >> > trigger > > >> > > a > > >> > > > > > flush > > >> > > > > > >> so > > >> > > > > > >>> that we don't get OOMEs? If the proposed transactional > > >> > mechanism > > >> > > > > forces > > >> > > > > > >> all > > >> > > > > > >>> uncommitted writes to be buffered in memory, until a > > commit, > > >> > then > > >> > > > > what > > >> > > > > > is > > >> > > > > > >>> the advantage over just doing the same thing with the > > >> > RecordCache > > >> > > > and > > >> > > > > > not > > >> > > > > > >>> introducing the WriteBatch at all? > > >> > > > > > >>> > > >> > > > > > >>> 3. ALOS > > >> > > > > > >>> You mentioned that a transactional store can help reduce > > >> > > > duplication > > >> > > > > in > > >> > > > > > >>> the case of ALOS. We might want to be careful about > claims > > >> like > > >> > > > that. > > >> > > > > > >>> Duplication isn't the way that repeated processing > > >> manifests in > > >> > > > state > > >> > > > > > >>> stores. Rather, it is in the form of dirty reads during > > >> > > > reprocessing. > > >> > > > > > >> This > > >> > > > > > >>> feature may reduce the incidence of dirty reads during > > >> > > > reprocessing, > > >> > > > > > but > > >> > > > > > >>> not in a predictable way. During regular processing > today, > > >> we > > >> > > will > > >> > > > > send > > >> > > > > > >>> some records through to the changelog in between commit > > >> > > intervals. > > >> > > > > > Under > > >> > > > > > >>> ALOS, if any of those dirty writes gets committed to the > > >> > > changelog > > >> > > > > > topic, > > >> > > > > > >>> then upon failure, we have to roll the store forward to > > them > > >> > > > anyway, > > >> > > > > > >>> regardless of this new transactional mechanism. That's a > > >> > fixable > > >> > > > > > problem, > > >> > > > > > >>> by the way, but this KIP doesn't seem to fix it. I > wonder > > >> if we > > >> > > > > should > > >> > > > > > >> make > > >> > > > > > >>> any claims about the relationship of this feature to > ALOS > > if > > >> > the > > >> > > > > > >> real-world > > >> > > > > > >>> behavior is so complex. > > >> > > > > > >>> > > >> > > > > > >>> 4. IQ > > >> > > > > > >>> As a reminder, we have a new IQv2 mechanism now. Should > we > > >> > > propose > > >> > > > > any > > >> > > > > > >>> changes to IQv1 to support this transactional mechanism, > > >> versus > > >> > > > just > > >> > > > > > >>> proposing it for IQv2? Certainly, it seems strange only > to > > >> > > propose > > >> > > > a > > >> > > > > > >> change > > >> > > > > > >>> for IQv1 and not v2. > > >> > > > > > >>> > > >> > > > > > >>> Regarding your proposal for IQv1, I'm unsure what the > > >> behavior > > >> > > > should > > >> > > > > > be > > >> > > > > > >>> for readCommitted, since the current behavior also reads > > >> out of > > >> > > the > > >> > > > > > >>> RecordCache. I guess if readCommitted==false, then we > will > > >> > > continue > > >> > > > > to > > >> > > > > > >> read > > >> > > > > > >>> from the cache first, then the Batch, then the store; > and > > if > > >> > > > > > >>> readCommitted==true, we would skip the cache and the > Batch > > >> and > > >> > > only > > >> > > > > > read > > >> > > > > > >>> from the persistent RocksDB store? > > >> > > > > > >>> > > >> > > > > > >>> What should IQ do if I request to readCommitted on a > > >> > > > > non-transactional > > >> > > > > > >>> store? > > >> > > > > > >>> > > >> > > > > > >>> Thanks again for proposing the KIP, and my apologies for > > the > > >> > long > > >> > > > > > reply; > > >> > > > > > >>> I'm hoping to air all my concerns in one "batch" to save > > >> time > > >> > for > > >> > > > > you. > > >> > > > > > >>> > > >> > > > > > >>> Thanks, > > >> > > > > > >>> -John > > >> > > > > > >>> > > >> > > > > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov > > wrote: > > >> > > > > > >>>> Hi all, > > >> > > > > > >>>> > > >> > > > > > >>>> I've written a KIP for making Kafka Streams state > stores > > >> > > > > transactional > > >> > > > > > >>> and > > >> > > > > > >>>> would like to start a discussion: > > >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>> > > >> > > > > > >> > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > > >> > > > > > >>>> > > >> > > > > > >>>> Best, > > >> > > > > > >>>> Alex > > >> > > > > > >>> > > >> > > > > > >> > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > -- > > >> > > > > > > >> > > > > [image: Confluent] <https://www.confluent.io> > > >> > > > > Suhas Satish > > >> > > > > Engineering Manager > > >> > > > > Follow us: [image: Blog] > > >> > > > > < > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog > > >> > > > > >[image: > > >> > > > > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn] > > >> > > > > <https://www.linkedin.com/company/confluent/> > > >> > > > > > > >> > > > > [image: Try Confluent Cloud for Free] > > >> > > > > < > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > -- > > >> > > -- Guozhang > > >> > > > > >> > > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > -- > -- Guozhang >