Hey Guozhang, Bruno, Thank you for your feedback. I am going to respond to both of you in a single email. I hope it is okay.
@Guozhang, We could, instead, have a global > config to specify if the built-in stores should be transactional or not. This was the original approach I took in this proposal. Earlier in this thread John, Sagar, and Bruno listed a number of issues with it. I tend to agree with them that it is probably better user experience to control transactionality via Materialized objects. We could simplify our implementation for `commit` Agreed! I updated the prototype and removed references to the commit marker and rolling forward from the proposal. @Bruno, So, I would remove the details about the 2-state-store implementation > from the KIP or provide it as an example of a possible implementation at > the end of the KIP. > I moved the section about the 2-state-store implementation to the bottom of the proposal and always mention it as a reference implementation. Please let me know if this is okay. Could you please describe the usage of commit() and recover() in the > commit workflow in the KIP as we did in this thread but independently > from the state store implementation? I described how commit/recover change the workflow in the Overview section. Best, Alex On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi Alex, > > Thank a lot for explaining! > > Now some aspects are clearer to me. > > While I understand now, how the state store can roll forward, I have the > feeling that rolling forward is specific to the 2-state-store > implementation with RocksDB of your PoC. Other state store > implementations might use a different strategy to react to crashes. For > example, they might apply an atomic write and effectively rollback if > they crash before committing the state store transaction. I think the > KIP should not contain such implementation details but provide an > interface to accommodate rolling forward and rolling backward. > > So, I would remove the details about the 2-state-store implementation > from the KIP or provide it as an example of a possible implementation at > the end of the KIP. > > Since a state store implementation can roll forward or roll back, I > think it is fine to return the changelog offset from recover(). With the > returned changelog offset, Streams knows from where to start state store > restoration. > > Could you please describe the usage of commit() and recover() in the > commit workflow in the KIP as we did in this thread but independently > from the state store implementation? That would make things clearer. > Additionally, descriptions of failure scenarios would also be helpful. > > Best, > Bruno > > > On 04.08.22 16:39, Alexander Sorokoumov wrote: > > Hey Bruno, > > > > Thank you for the suggestions and the clarifying questions. I believe > that > > they cover the core of this proposal, so it is crucial for us to be on > the > > same page. > > > > 1. Don't you want to deprecate StateStore#flush(). > > > > > > Good call! I updated both the proposal and the prototype. > > > > 2. I would shorten Materialized#withTransactionalityEnabled() to > >> Materialized#withTransactionsEnabled(). > > > > > > Turns out, these methods are no longer necessary. I removed them from the > > proposal and the prototype. > > > > > >> 3. Could you also describe a bit more in detail where the offsets passed > >> into commit() and recover() come from? > > > > > > The offset passed into StateStore#commit is the last offset committed to > > the changelog topic. The offset passed into StateStore#recover is the > last > > checkpointed offset for the given StateStore. Let's look at steps 3 and 4 > > in the commit workflow. After the TaskExecutor/TaskManager commits, it > calls > > StreamTask#postCommit[1] that in turn: > > a. updates the changelog offsets via > > ProcessorStateManager#updateChangelogOffsets[2]. The offsets here come > from > > the RecordCollector[3], which tracks the latest offsets the producer sent > > without exception[4, 5]. > > b. flushes/commits the state store in AbstractTask#maybeCheckpoint[6]. > This > > method essentially calls ProcessorStateManager methods - flush/commit[7] > > and checkpoint[8]. ProcessorStateManager#commit goes over all state > stores > > that belong to that task and commits them with the offset obtained in > step > > `a`. ProcessorStateManager#checkpoint writes down those offsets for all > > state stores, except for non-transactional ones in the case of EOS. > > > > During initialization, StreamTask calls > > StateManagerUtil#registerStateStores[8] that in turn calls > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. At the > > moment, this method assigns checkpointed offsets to the corresponding > state > > stores[10]. The prototype also calls StateStore#recover with the > > checkpointed offset and assigns the offset returned by recover()[11]. > > > > 4. I do not quite understand how a state store can roll forward. You > >> mention in the thread the following: > > > > > > The 2-state-stores commit looks like this [12]: > > > > 1. Flush the temporary state store. > > 2. Create a commit marker with a changelog offset corresponding to > the > > state we are committing. > > 3. Go over all keys in the temporary store and write them down to the > > main one. > > 4. Wipe the temporary store. > > 5. Delete the commit marker. > > > > > > Let's consider crash failure scenarios: > > > > - Crash failure happens between steps 1 and 2. The main state store > is > > in a consistent state that corresponds to the previously checkpointed > > offset. StateStore#recover throws away the temporary store and > proceeds > > from the last checkpointed offset. > > - Crash failure happens between steps 2 and 3. We do not know what > keys > > from the temporary store were already written to the main store, so > we > > can't roll back. There are two options - either wipe the main store > or roll > > forward. Since the point of this proposal is to avoid situations > where we > > throw away the state and we do not care to what consistent state the > store > > rolls to, we roll forward by continuing from step 3. > > - Crash failure happens between steps 3 and 4. We can't distinguish > > between this and the previous scenario, so we write all the keys > from the > > temporary store. This is okay because the operation is idempotent. > > - Crash failure happens between steps 4 and 5. Again, we can't > > distinguish between this and previous scenarios, but the temporary > store is > > already empty. Even though we write all keys from the temporary > store, this > > operation is, in fact, no-op. > > - Crash failure happens between step 5 and checkpoint. This is the > case > > you referred to in question 5. The commit is finished, but it is not > > reflected at the checkpoint. recover() returns the offset of the > previous > > commit here, which is incorrect, but it is okay because we will > replay the > > changelog from the previously committed offset. As changelog replay > is > > idempotent, the state store recovers into a consistent state. > > > > The last crash failure scenario is a natural transition to > > > > how should Streams know what to write into the checkpoint file > >> after the crash? > >> > > > > As mentioned above, the Streams app writes the checkpoint file after the > > Kafka transaction and then the StateStore commit. Same as without the > > proposal, it should write the committed offset, as it is the same for > both > > the Kafka changelog and the state store. > > > > > >> This issue arises because we store the offset outside of the state > >> store. Maybe we need an additional method on the state store interface > >> that returns the offset at which the state store is. > > > > > > In my opinion, we should include in the interface only the guarantees > that > > are necessary to preserve EOS without wiping the local state. This way, > we > > allow more room for possible implementations. Thanks to the idempotency > of > > the changelog replay, it is "good enough" if StateStore#recover returns > the > > offset that is less than what it actually is. The only limitation here is > > that the state store should never commit writes that are not yet > committed > > in Kafka changelog. > > > > Please let me know what you think about this. First of all, I am > relatively > > new to the codebase, so I might be wrong in my understanding of > > how it works. Second, while writing this, it occured to me that the > > StateStore#recover interface method is not straightforward as it can be. > > Maybe we can change it like that: > > > > /** > > * Recover a transactional state store > > * <p> > > * If a transactional state store shut down with a crash failure, > this > > method ensures that the > > * state store is in a consistent state that corresponds to {@code > > changelofOffset} or later. > > * > > * @param changelogOffset the checkpointed changelog offset. > > * @return {@code true} if recovery succeeded, {@code false} > otherwise. > > */ > > boolean recover(final Long changelogOffset) { > > > > Note: all links below except for [10] lead to the prototype's code. > > 1. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468 > > 2. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580 > > 3. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868 > > 4. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96 > > 5. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216 > > 6. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97 > > 7. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469 > > 8. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226 > > 9. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103 > > 10. > > > https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252 > > 11. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265 > > 12. > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88 > > > > Best, > > Alex > > > > On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <cado...@apache.org> > wrote: > > > >> Hi Alex, > >> > >> Thanks for the updates! > >> > >> 1. Don't you want to deprecate StateStore#flush(). As far as I > >> understand, commit() is the new flush(), right? If you do not deprecate > >> it, you don't get rid of the error room you describe in your KIP by > >> having a flush() and a commit(). > >> > >> > >> 2. I would shorten Materialized#withTransactionalityEnabled() to > >> Materialized#withTransactionsEnabled(). > >> > >> > >> 3. Could you also describe a bit more in detail where the offsets passed > >> into commit() and recover() come from? > >> > >> > >> For my next two points, I need the commit workflow that you were so kind > >> to post into this thread: > >> > >> 1. write stuff to the state store > >> 2. producer.sendOffsetsToTransaction(token); > producer.commitTransaction(); > >> 3. flush (<- that would be call to commit(), right?) > >> 4. checkpoint > >> > >> > >> 4. I do not quite understand how a state store can roll forward. You > >> mention in the thread the following: > >> > >> "If the crash failure happens during #3, the state store can roll > >> forward and finish the flush/commit." > >> > >> How does the state store know where it stopped the flushing when it > >> crashed? > >> > >> This seems an optimization to me. I think in general the state store > >> should rollback to the last successfully committed state and restore > >> from there until the end of the changelog topic partition. The last > >> committed state is the offsets in the checkpoint file. > >> > >> > >> 5. In the same e-mail from point 4, you also state: > >> > >> "If the crash failure happens between #3 and #4, the state store should > >> do nothing during recovery and just proceed with the checkpoint." > >> > >> How should Streams know that the failure was between #3 and #4 during > >> recovery? It just sees a valid state store and a valid checkpoint file. > >> Streams does not know that the state of the checkpoint file does not > >> match with the committed state of the state store. > >> Also, how should Streams know what to write into the checkpoint file > >> after the crash? > >> This issue arises because we store the offset outside of the state > >> store. Maybe we need an additional method on the state store interface > >> that returns the offset at which the state store is. > >> > >> > >> Best, > >> Bruno > >> > >> > >> > >> > >> On 27.07.22 11:51, Alexander Sorokoumov wrote: > >>> Hey Nick, > >>> > >>> Thank you for the kind words and the feedback! I'll definitely add an > >>> option to configure the transactional mechanism in Stores factory > method > >>> via an argument as John previously suggested and might add the > in-memory > >>> option via RocksDB Indexed Batches if I figure why their creation via > >>> rocksdb jni fails with `UnsatisfiedLinkException`. > >>> > >>> Best, > >>> Alex > >>> > >>> On Wed, Jul 27, 2022 at 11:46 AM Alexander Sorokoumov < > >>> asorokou...@confluent.io> wrote: > >>> > >>>> Hey Guozhang, > >>>> > >>>> 1) About the param passed into the `recover()` function: it seems to > me > >>>>> that the semantics of "recover(offset)" is: recover this state to a > >>>>> transaction boundary which is at least the passed-in offset. And the > >> only > >>>>> possibility that the returned offset is different than the passed-in > >>>>> offset > >>>>> is that if the previous failure happens after we've done all the > commit > >>>>> procedures except writing the new checkpoint, in which case the > >> returned > >>>>> offset would be larger than the passed-in offset. Otherwise it should > >>>>> always be equal to the passed-in offset, is that right? > >>>> > >>>> > >>>> Right now, the only case when `recover` returns an offset different > from > >>>> the passed one is when the failure happens *during* commit. > >>>> > >>>> > >>>> If the failure happens after commit but before the checkpoint, > `recover` > >>>> might return either a passed or newer committed offset, depending on > the > >>>> implementation. The `recover` implementation in the prototype returns > a > >>>> passed offset because it deletes the commit marker that holds that > >> offset > >>>> after the commit is done. In that case, the store will replay the last > >>>> commit from the changelog. I think it is fine as the changelog replay > is > >>>> idempotent. > >>>> > >>>> 2) It seems the only use for the "transactional()" function is to > >> determine > >>>>> if we can update the checkpoint file while in EOS. > >>>> > >>>> > >>>> Right now, there are 2 other uses for `transactional()`: > >>>> 1. To determine what to do during initialization if the checkpoint is > >> gone > >>>> (see [1]). If the state store is transactional, we don't have to wipe > >> the > >>>> existing data. Thinking about it now, we do not really need this check > >>>> whether the store is `transactional` because if it is not, we'd not > have > >>>> written the checkpoint in the first place. I am going to remove that > >> check. > >>>> 2. To determine if the persistent kv store in KStreamImplJoin should > be > >>>> transactional (see [2], [3]). > >>>> > >>>> I am not sure if we can get rid of the checks in point 2. If so, I'd > be > >>>> happy to encapsulate `transactional()` logic in `commit/recover`. > >>>> > >>>> Best, > >>>> Alex > >>>> > >>>> 1. > >>>> > >> > https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281 > >>>> 2. > >>>> > >> > https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278 > >>>> 3. > >>>> > >> > https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354 > >>>> > >>>> On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <nick.telf...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi Alex, > >>>>> > >>>>> Excellent proposal, I'm very keen to see this land! > >>>>> > >>>>> Would it be useful to permit configuring the type of store used for > >>>>> uncommitted offsets on a store-by-store basis? This way, users could > >>>>> choose > >>>>> whether to use, e.g. an in-memory store or RocksDB, potentially > >> reducing > >>>>> the overheads associated with RocksDb for smaller stores, but without > >> the > >>>>> memory pressure issues? > >>>>> > >>>>> I suspect that in most cases, the number of uncommitted records will > be > >>>>> very small, because the default commit interval is 100ms. > >>>>> > >>>>> Regards, > >>>>> > >>>>> Nick > >>>>> > >>>>> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>>>> > >>>>>> Hello Alex, > >>>>>> > >>>>>> Thanks for the updated KIP, I looked over it and browsed the WIP and > >>>>> just > >>>>>> have a couple meta thoughts: > >>>>>> > >>>>>> 1) About the param passed into the `recover()` function: it seems to > >> me > >>>>>> that the semantics of "recover(offset)" is: recover this state to a > >>>>>> transaction boundary which is at least the passed-in offset. And the > >>>>> only > >>>>>> possibility that the returned offset is different than the passed-in > >>>>> offset > >>>>>> is that if the previous failure happens after we've done all the > >> commit > >>>>>> procedures except writing the new checkpoint, in which case the > >> returned > >>>>>> offset would be larger than the passed-in offset. Otherwise it > should > >>>>>> always be equal to the passed-in offset, is that right? > >>>>>> > >>>>>> 2) It seems the only use for the "transactional()" function is to > >>>>> determine > >>>>>> if we can update the checkpoint file while in EOS. But the purpose > of > >>>>> the > >>>>>> checkpoint file's offsets is just to tell "the local state's current > >>>>>> snapshot's progress is at least the indicated offsets" anyways, and > >> with > >>>>>> this KIP maybe we would just do: > >>>>>> > >>>>>> a) when in ALOS, upon failover: we set the starting offset as > >>>>>> checkpointed-offset, then restore() from changelog till the > >> end-offset. > >>>>>> This way we may restore some records twice. > >>>>>> b) when in EOS, upon failover: we first call > >>>>> recover(checkpointed-offset), > >>>>>> then set the starting offset as the returned offset (which may be > >> larger > >>>>>> than checkpointed-offset), then restore until the end-offset. > >>>>>> > >>>>>> So why not also: > >>>>>> c) we let the `commit()` function to also return an offset, which > >>>>> indicates > >>>>>> "checkpointable offsets". > >>>>>> d) for existing non-transactional stores, we just have a default > >>>>>> implementation of "commit()" which is simply a flush, and returns a > >>>>>> sentinel value like -1. Then later if we get checkpointable offsets > >> -1, > >>>>> we > >>>>>> do not write the checkpoint. Upon clean shutting down we can just > >>>>>> checkpoint regardless of the returned value from "commit". > >>>>>> e) for existing non-transactional stores, we just have a default > >>>>>> implementation of "recover()" which is to wipe out the local store > and > >>>>>> return offset 0 if the passed in offset is -1, otherwise if not -1 > >> then > >>>>> it > >>>>>> indicates a clean shutdown in the last run, can this function is > just > >> a > >>>>>> no-op. > >>>>>> > >>>>>> In that case, we would not need the "transactional()" function > >> anymore, > >>>>>> since for non-transactional stores their behaviors are still wrapped > >> in > >>>>> the > >>>>>> `commit / recover` function pairs. > >>>>>> > >>>>>> I have not completed the thorough pass on your WIP PR, so maybe I > >> could > >>>>>> come up with some more feedback later, but just let me know if my > >>>>>> understanding above is correct or not? > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov > >>>>>> <asorokou...@confluent.io.invalid> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> I updated the KIP with the following changes: > >>>>>>> * Replaced in-memory batches with the secondary-store approach as > the > >>>>>>> default implementation to address the feedback about memory > pressure > >>>>> as > >>>>>>> suggested by Sagar and Bruno. > >>>>>>> * Introduced StateStore#commit and StateStore#recover methods as an > >>>>>>> extension of the rollback idea. @Guozhang, please see the comment > >>>>> below > >>>>>> on > >>>>>>> why I took a slightly different approach than you suggested. > >>>>>>> * Removed mentions of changes to IQv1 and IQv2. Transactional state > >>>>>> stores > >>>>>>> enable reading committed in IQ, but it is really an independent > >>>>> feature > >>>>>>> that deserves its own KIP. Conflating them unnecessarily increases > >> the > >>>>>>> scope for discussion, implementation, and testing in a single unit > of > >>>>>> work. > >>>>>>> > >>>>>>> I also published a prototype - > >>>>>> https://github.com/apache/kafka/pull/12393 > >>>>>>> that implements changes described in the proposal. > >>>>>>> > >>>>>>> Regarding explicit rollback, I think it is a powerful idea that > >> allows > >>>>>>> other StateStore implementations to take a different path to the > >>>>>>> transactional behavior rather than keep 2 state stores. Instead of > >>>>>>> introducing a new commit token, I suggest using a changelog offset > >>>>> that > >>>>>>> already 1:1 corresponds to the materialized state. This works > nicely > >>>>>>> because Kafka Stream first commits an AK transaction and only then > >>>>>>> checkpoints the state store, so we can use the changelog offset to > >>>>> commit > >>>>>>> the state store transaction. > >>>>>>> > >>>>>>> I called the method StateStore#recover rather than > >> StateStore#rollback > >>>>>>> because a state store might either roll back or forward depending > on > >>>>> the > >>>>>>> specific point of the crash failure.Consider the write algorithm in > >>>>> Kafka > >>>>>>> Streams is: > >>>>>>> 1. write stuff to the state store > >>>>>>> 2. producer.sendOffsetsToTransaction(token); > >>>>>> producer.commitTransaction(); > >>>>>>> 3. flush > >>>>>>> 4. checkpoint > >>>>>>> > >>>>>>> Let's consider 3 cases: > >>>>>>> 1. If the crash failure happens between #2 and #3, the state store > >>>>> rolls > >>>>>>> back and replays the uncommitted transaction from the changelog. > >>>>>>> 2. If the crash failure happens during #3, the state store can roll > >>>>>> forward > >>>>>>> and finish the flush/commit. > >>>>>>> 3. If the crash failure happens between #3 and #4, the state store > >>>>> should > >>>>>>> do nothing during recovery and just proceed with the checkpoint. > >>>>>>> > >>>>>>> Looking forward to your feedback, > >>>>>>> Alexander > >>>>>>> > >>>>>>> On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov < > >>>>>>> asorokou...@confluent.io> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> As a status update, I did the following changes to the KIP: > >>>>>>>> * replaced configuration via the top-level config with > configuration > >>>>>> via > >>>>>>>> Stores factory and StoreSuppliers, > >>>>>>>> * added IQv2 and elaborated how readCommitted will work when the > >>>>> store > >>>>>> is > >>>>>>>> not transactional, > >>>>>>>> * removed claims about ALOS. > >>>>>>>> > >>>>>>>> I am going to be OOO in the next couple of weeks and will resume > >>>>>> working > >>>>>>>> on the proposal and responding to the discussion in this thread > >>>>>> starting > >>>>>>>> June 27. My next top priorities are: > >>>>>>>> 1. Prototype the rollback approach as suggested by Guozhang. > >>>>>>>> 2. Replace in-memory batches with the secondary-store approach as > >>>>> the > >>>>>>>> default implementation to address the feedback about memory > >>>>> pressure as > >>>>>>>> suggested by Sagar and Bruno. > >>>>>>>> 3. Adjust Stores methods to make transactional implementations > >>>>>> pluggable. > >>>>>>>> 4. Publish the POC for the first review. > >>>>>>>> > >>>>>>>> Best regards, > >>>>>>>> Alex > >>>>>>>> > >>>>>>>> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Alex, > >>>>>>>>> > >>>>>>>>> Thanks for your replies! That is very helpful. > >>>>>>>>> > >>>>>>>>> Just to broaden our discussions a bit here, I think there are > some > >>>>>> other > >>>>>>>>> approaches in parallel to the idea of "enforce to only persist > upon > >>>>>>>>> explicit flush" and I'd like to throw one here -- not really > >>>>>> advocating > >>>>>>>>> it, > >>>>>>>>> but just for us to compare the pros and cons: > >>>>>>>>> > >>>>>>>>> 1) We let the StateStore's `flush` function to return a token > >>>>> instead > >>>>>> of > >>>>>>>>> returning `void`. > >>>>>>>>> 2) We add another `rollback(token)` interface of StateStore which > >>>>>> would > >>>>>>>>> effectively rollback the state as indicated by the token to the > >>>>>> snapshot > >>>>>>>>> when the corresponding `flush` is called. > >>>>>>>>> 3) We encode the token and commit as part of > >>>>>>>>> `producer#sendOffsetsToTransaction`. > >>>>>>>>> > >>>>>>>>> Users could optionally implement the new functions, or they can > >>>>> just > >>>>>> not > >>>>>>>>> return the token at all and not implement the second function. > >>>>> Again, > >>>>>>> the > >>>>>>>>> APIs are just for the sake of illustration, not feeling they are > >>>>> the > >>>>>>> most > >>>>>>>>> natural :) > >>>>>>>>> > >>>>>>>>> Then the procedure would be: > >>>>>>>>> > >>>>>>>>> 1. the previous checkpointed offset is 100 > >>>>>>>>> ... > >>>>>>>>> 3. flush store, make sure all writes are persisted; get the > >>>>> returned > >>>>>>> token > >>>>>>>>> that indicates the snapshot of 200. > >>>>>>>>> 4. producer.sendOffsetsToTransaction(token); > >>>>>>> producer.commitTransaction(); > >>>>>>>>> 5. Update the checkpoint file (say, the new value is 200). > >>>>>>>>> > >>>>>>>>> Then if there's a failure, say between 3/4, we would get the > token > >>>>>> from > >>>>>>>>> the > >>>>>>>>> last committed txn, and first we would do the restoration (which > >>>>> may > >>>>>> get > >>>>>>>>> the state to somewhere between 100 and 200), then call > >>>>>>>>> `store.rollback(token)` to rollback to the snapshot of offset > 100. > >>>>>>>>> > >>>>>>>>> The pros is that we would then not need to enforce the state > >>>>> stores to > >>>>>>> not > >>>>>>>>> persist any data during the txn: for stores that may not be able > to > >>>>>>>>> implement the `rollback` function, they can still reduce its impl > >>>>> to > >>>>>>> "not > >>>>>>>>> persisting any data" via this API, but for stores that can indeed > >>>>>>> support > >>>>>>>>> the rollback, their implementation may be more efficient. The > cons > >>>>>>> though, > >>>>>>>>> on top of my head are 1) more complicated logic differentiating > >>>>>> between > >>>>>>>>> EOS > >>>>>>>>> with and without store rollback support, and ALOS, 2) encoding > the > >>>>>> token > >>>>>>>>> as > >>>>>>>>> part of the commit offset is not ideal if it is big, 3) the > >>>>> recovery > >>>>>>> logic > >>>>>>>>> including the state store is also a bit more complicated. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov > >>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Guozhang, > >>>>>>>>>> > >>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it > >>>>> seems > >>>>>>>>> that we > >>>>>>>>>>> would achieve it by enforcing to not persist any data written > >>>>>> within > >>>>>>>>> this > >>>>>>>>>>> transaction until step 4. Is that correct? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> This is correct. Both alternatives - in-memory > >>>>> WriteBatchWithIndex > >>>>>> and > >>>>>>>>>> transactionality via the secondary store guarantee EOS by not > >>>>>>> persisting > >>>>>>>>>> data in the "main" state store until it is committed in the > >>>>>> changelog > >>>>>>>>>> topic. > >>>>>>>>>> > >>>>>>>>>> Oh what I meant is not what KStream code does, but that > >>>>> StateStore > >>>>>>> impl > >>>>>>>>>>> classes themselves could potentially flush data to become > >>>>>> persisted > >>>>>>>>>>> asynchronously > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thank you for elaborating! You are correct, the underlying state > >>>>>> store > >>>>>>>>>> should not persist data until the streams app calls > >>>>>> StateStore#flush. > >>>>>>>>> There > >>>>>>>>>> are 2 options how a State Store implementation can guarantee > >>>>> that - > >>>>>>>>> either > >>>>>>>>>> keep uncommitted writes in memory or be able to roll back the > >>>>>> changes > >>>>>>>>> that > >>>>>>>>>> were not committed during recovery. RocksDB's > >>>>> WriteBatchWithIndex is > >>>>>>> an > >>>>>>>>>> implementation of the first option. A considered alternative, > >>>>>>>>> Transactions > >>>>>>>>>> via Secondary State Store for Uncommitted Changes, is the way to > >>>>>>>>> implement > >>>>>>>>>> the second option. > >>>>>>>>>> > >>>>>>>>>> As everyone correctly pointed out, keeping uncommitted data in > >>>>>> memory > >>>>>>>>>> introduces a very real risk of OOM that we will need to handle. > >>>>> The > >>>>>>>>> more I > >>>>>>>>>> think about it, the more I lean towards going with the > >>>>> Transactions > >>>>>>> via > >>>>>>>>>> Secondary Store as the way to implement transactionality as it > >>>>> does > >>>>>>> not > >>>>>>>>>> have that issue. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Alex > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang < > >>>>> wangg...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hello Alex, > >>>>>>>>>>> > >>>>>>>>>>>> we flush the cache, but not the underlying state store. > >>>>>>>>>>> > >>>>>>>>>>> You're right. The ordering I mentioned above is actually: > >>>>>>>>>>> > >>>>>>>>>>> ... > >>>>>>>>>>> 3. producer.sendOffsetsToTransaction(); > >>>>>>> producer.commitTransaction(); > >>>>>>>>>>> 4. flush store, make sure all writes are persisted. > >>>>>>>>>>> 5. Update the checkpoint file to 200. > >>>>>>>>>>> > >>>>>>>>>>> But I'm still trying to clarify how it guarantees EOS, and it > >>>>>> seems > >>>>>>>>> that > >>>>>>>>>> we > >>>>>>>>>>> would achieve it by enforcing to not persist any data written > >>>>>> within > >>>>>>>>> this > >>>>>>>>>>> transaction until step 4. Is that correct? > >>>>>>>>>>> > >>>>>>>>>>>> Can you please point me to the place in the codebase where we > >>>>>>>>> trigger > >>>>>>>>>>> async flush before the commit? > >>>>>>>>>>> > >>>>>>>>>>> Oh what I meant is not what KStream code does, but that > >>>>> StateStore > >>>>>>>>> impl > >>>>>>>>>>> classes themselves could potentially flush data to become > >>>>>> persisted > >>>>>>>>>>> asynchronously, e.g. RocksDB does that naturally out of the > >>>>>> control > >>>>>>> of > >>>>>>>>>>> KStream code. I think it is related to my previous question: > >>>>> if we > >>>>>>>>> think > >>>>>>>>>> by > >>>>>>>>>>> guaranteeing EOS at the state store level, we would effectively > >>>>>> ask > >>>>>>>>> the > >>>>>>>>>>> impl classes that "you should not persist any data until > >>>>> `flush` > >>>>>> is > >>>>>>>>>> called > >>>>>>>>>>> explicitly", is the StateStore interface the right level to > >>>>>> enforce > >>>>>>>>> such > >>>>>>>>>>> mechanisms, or should we just do that on top of the > >>>>> StateStores, > >>>>>>> e.g. > >>>>>>>>>>> during the transaction we just keep all the writes in the cache > >>>>>> (of > >>>>>>>>>> course > >>>>>>>>>>> we need to consider how to work around memory pressure as > >>>>>> previously > >>>>>>>>>>> mentioned), and then upon committing, we just write the cached > >>>>>>> records > >>>>>>>>>> as a > >>>>>>>>>>> whole into the store and then call flush. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov > >>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hey, > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for the wealth of great suggestions and questions! > >>>>> I > >>>>>> am > >>>>>>>>> going > >>>>>>>>>>> to > >>>>>>>>>>>> address the feedback in batches and update the proposal > >>>>> async, > >>>>>> as > >>>>>>>>> it is > >>>>>>>>>>>> probably going to be easier for everyone. I will also write a > >>>>>>>>> separate > >>>>>>>>>>>> message after making updates to the KIP. > >>>>>>>>>>>> > >>>>>>>>>>>> @John, > >>>>>>>>>>>> > >>>>>>>>>>>>> Did you consider instead just adding the option to the > >>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in Stores ? > >>>>>>>>>>>> > >>>>>>>>>>>> Thank you for suggesting that. I think that this idea is > >>>>> better > >>>>>>> than > >>>>>>>>>>> what I > >>>>>>>>>>>> came up with and will update the KIP with configuring > >>>>>>>>> transactionality > >>>>>>>>>>> via > >>>>>>>>>>>> the suppliers and Stores. > >>>>>>>>>>>> > >>>>>>>>>>>> what is the advantage over just doing the same thing with the > >>>>>>>>>> RecordCache > >>>>>>>>>>>>> and not introducing the WriteBatch at all? > >>>>>>>>>>>> > >>>>>>>>>>>> Can you point me to RecordCache? I can't find it in the > >>>>> project. > >>>>>>> The > >>>>>>>>>>>> advantage would be that WriteBatch guarantees write > >>>>> atomicity. > >>>>>> As > >>>>>>>>> far > >>>>>>>>>> as > >>>>>>>>>>> I > >>>>>>>>>>>> understood the way RecordCache works, it might leave the > >>>>> system > >>>>>> in > >>>>>>>>> an > >>>>>>>>>>>> inconsistent state during crash failure on write. > >>>>>>>>>>>> > >>>>>>>>>>>> You mentioned that a transactional store can help reduce > >>>>>>>>> duplication in > >>>>>>>>>>> the > >>>>>>>>>>>>> case of ALOS > >>>>>>>>>>>> > >>>>>>>>>>>> I will remove claims about ALOS from the proposal. Thank you > >>>>> for > >>>>>>>>>>>> elaborating! > >>>>>>>>>>>> > >>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now. Should we > >>>>>> propose > >>>>>>>>> any > >>>>>>>>>>>>> changes to IQv1 to support this transactional mechanism, > >>>>>> versus > >>>>>>>>> just > >>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange only to > >>>>>>>>> propose a > >>>>>>>>>>>> change > >>>>>>>>>>>>> for IQv1 and not v2. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> I will update the proposal with complementary API changes > >>>>> for > >>>>>>> IQv2 > >>>>>>>>>>>> > >>>>>>>>>>>> What should IQ do if I request to readCommitted on a > >>>>>>>>> non-transactional > >>>>>>>>>>>>> store? > >>>>>>>>>>>> > >>>>>>>>>>>> We can assume that non-transactional stores commit on write, > >>>>> so > >>>>>> IQ > >>>>>>>>>> works > >>>>>>>>>>> in > >>>>>>>>>>>> the same way with non-transactional stores regardless of the > >>>>>> value > >>>>>>>>> of > >>>>>>>>>>>> readCommitted. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> @Guozhang, > >>>>>>>>>>>> > >>>>>>>>>>>> * If we crash between line 3 and 4, then at that time the > >>>>> local > >>>>>>>>>>> persistent > >>>>>>>>>>>>> store image is representing as of offset 200, but upon > >>>>>> recovery > >>>>>>>>> all > >>>>>>>>>>>>> changelog records from 100 to log-end-offset would be > >>>>>> considered > >>>>>>>>> as > >>>>>>>>>>>> aborted > >>>>>>>>>>>>> and not be replayed and we would restart processing from > >>>>>>> position > >>>>>>>>>> 100. > >>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how e.g. > >>>>>>>>> RocksDB's > >>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and > >>>>> step 5 > >>>>>>>>> could > >>>>>>>>>> be > >>>>>>>>>>>>> done atomically here. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Could you please point me to the place in the codebase where > >>>>> a > >>>>>>> task > >>>>>>>>>>> flushes > >>>>>>>>>>>> the store before committing the transaction? > >>>>>>>>>>>> Looking at TaskExecutor ( > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167 > >>>>>>>>>>>> ), > >>>>>>>>>>>> StreamTask#prepareCommit ( > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398 > >>>>>>>>>>>> ), > >>>>>>>>>>>> and CachedStateStore ( > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34 > >>>>>>>>>>>> ) > >>>>>>>>>>>> we flush the cache, but not the underlying state store. > >>>>> Explicit > >>>>>>>>>>>> StateStore#flush happens in > >>>>> AbstractTask#maybeWriteCheckpoint ( > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99 > >>>>>>>>>>>> ). > >>>>>>>>>>>> Is there something I am missing here? > >>>>>>>>>>>> > >>>>>>>>>>>> Today all cached data that have not been flushed are not > >>>>>> committed > >>>>>>>>> for > >>>>>>>>>>>>> sure, but even flushed data to the persistent underlying > >>>>> store > >>>>>>> may > >>>>>>>>>> also > >>>>>>>>>>>> be > >>>>>>>>>>>>> uncommitted since flushing can be triggered asynchronously > >>>>>>> before > >>>>>>>>> the > >>>>>>>>>>>>> commit. > >>>>>>>>>>>> > >>>>>>>>>>>> Can you please point me to the place in the codebase where we > >>>>>>>>> trigger > >>>>>>>>>>> async > >>>>>>>>>>>> flush before the commit? This would certainly be a reason to > >>>>>>>>> introduce > >>>>>>>>>> a > >>>>>>>>>>>> dedicated StateStore#commit method. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks again for the feedback. I am going to update the KIP > >>>>> and > >>>>>>> then > >>>>>>>>>>>> respond to the next batch of questions and suggestions. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Alex > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, May 30, 2022 at 5:13 PM Suhas Satish > >>>>>>>>>>> <ssat...@confluent.io.invalid > >>>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP proposal Alex. > >>>>>>>>>>>>> 1. Configuration default > >>>>>>>>>>>>> > >>>>>>>>>>>>> You mention applications using streams DSL with built-in > >>>>>> rocksDB > >>>>>>>>>> state > >>>>>>>>>>>>> store will get transactional state stores by default when > >>>>> EOS > >>>>>> is > >>>>>>>>>>> enabled, > >>>>>>>>>>>>> but the default implementation for apps using PAPI will > >>>>>> fallback > >>>>>>>>> to > >>>>>>>>>>>>> non-transactional behavior. > >>>>>>>>>>>>> Shouldn't we have the same default behavior for both types > >>>>> of > >>>>>>>>> apps - > >>>>>>>>>>> DSL > >>>>>>>>>>>>> and PAPI? > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna < > >>>>>>> cado...@apache.org > >>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the PR, Alex! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I am also glad to see this coming. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Configuration > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I would also prefer to restrict the configuration of > >>>>>>>>> transactional > >>>>>>>>>> on > >>>>>>>>>>>>>> the state sore. Ideally, calling method transactional() > >>>>> on > >>>>>> the > >>>>>>>>>> state > >>>>>>>>>>>>>> store would be enough. An option on the store builder > >>>>> would > >>>>>>>>> make it > >>>>>>>>>>>>>> possible to turn transactionality on and off (as John > >>>>>>> proposed). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. Memory usage in RocksDB > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This seems to be a major issue. We do not have any > >>>>> guarantee > >>>>>>>>> that > >>>>>>>>>>>>>> uncommitted writes fit into memory and I guess we will > >>>>> never > >>>>>>>>> have. > >>>>>>>>>>> What > >>>>>>>>>>>>>> happens when the uncommitted writes do not fit into > >>>>> memory? > >>>>>>> Does > >>>>>>>>>>>> RocksDB > >>>>>>>>>>>>>> throw an exception? Can we handle such an exception > >>>>> without > >>>>>>>>>> crashing? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Does the RocksDB behavior even need to be included in > >>>>> this > >>>>>>> KIP? > >>>>>>>>> In > >>>>>>>>>>> the > >>>>>>>>>>>>>> end it is an implementation detail. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> What we should consider - though - is a memory limit in > >>>>> some > >>>>>>>>> form. > >>>>>>>>>>> And > >>>>>>>>>>>>>> what we do when the memory limit is exceeded. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. PoC > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I agree with Guozhang that a PoC is a good idea to better > >>>>>>>>>> understand > >>>>>>>>>>>> the > >>>>>>>>>>>>>> devils in the details. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 25.05.22 01:52, Guozhang Wang wrote: > >>>>>>>>>>>>>>> Hello Alex, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for writing the proposal! Glad to see it > >>>>> coming. I > >>>>>>>>> think > >>>>>>>>>>> this > >>>>>>>>>>>> is > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> kind of a KIP that since too many devils would be > >>>>> buried > >>>>>> in > >>>>>>>>> the > >>>>>>>>>>>> details > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> it's better to start working on a POC, either in > >>>>> parallel, > >>>>>>> or > >>>>>>>>>>> before > >>>>>>>>>>>> we > >>>>>>>>>>>>>>> resume our discussion, rather than blocking any > >>>>>>> implementation > >>>>>>>>>>> until > >>>>>>>>>>>> we > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>> satisfied with the proposal. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Just as a concrete example, I personally am still not > >>>>> 100% > >>>>>>>>> clear > >>>>>>>>>>> how > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> proposal would work to achieve EOS with the state > >>>>> stores. > >>>>>>> For > >>>>>>>>>>>> example, > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> commit procedure today looks like this: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 0: there's an existing checkpoint file indicating the > >>>>>>>>> changelog > >>>>>>>>>>>> offset > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> the local state store image is 100. Now a commit is > >>>>>>> triggered: > >>>>>>>>>>>>>>> 1. flush cache (since it contains partially processed > >>>>>>>>> records), > >>>>>>>>>>> make > >>>>>>>>>>>>> sure > >>>>>>>>>>>>>>> all records are written to the producer. > >>>>>>>>>>>>>>> 2. flush producer, making sure all changelog records > >>>>> have > >>>>>>> now > >>>>>>>>>>> acked. > >>>>>>>>>>>> // > >>>>>>>>>>>>>>> here we would get the new changelog position, say 200 > >>>>>>>>>>>>>>> 3. flush store, make sure all writes are persisted. > >>>>>>>>>>>>>>> 4. producer.sendOffsetsToTransaction(); > >>>>>>>>>>> producer.commitTransaction(); > >>>>>>>>>>>>> // > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> would make the writes in changelog up to offset 200 > >>>>>>> committed > >>>>>>>>>>>>>>> 5. Update the checkpoint file to 200. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The question about atomicity between those lines, for > >>>>>>> example: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> * If we crash between line 4 and line 5, the local > >>>>>>> checkpoint > >>>>>>>>>> file > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>> stay as 100, and upon recovery we would replay the > >>>>>> changelog > >>>>>>>>> from > >>>>>>>>>>> 100 > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>> 200. This is not ideal but does not violate EOS, since > >>>>> the > >>>>>>>>>>> changelogs > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>> all overwrites anyways. > >>>>>>>>>>>>>>> * If we crash between line 3 and 4, then at that time > >>>>> the > >>>>>>>>> local > >>>>>>>>>>>>>> persistent > >>>>>>>>>>>>>>> store image is representing as of offset 200, but upon > >>>>>>>>> recovery > >>>>>>>>>> all > >>>>>>>>>>>>>>> changelog records from 100 to log-end-offset would be > >>>>>>>>> considered > >>>>>>>>>> as > >>>>>>>>>>>>>> aborted > >>>>>>>>>>>>>>> and not be replayed and we would restart processing > >>>>> from > >>>>>>>>> position > >>>>>>>>>>>> 100. > >>>>>>>>>>>>>>> Restart processing will violate EOS.I'm not sure how > >>>>> e.g. > >>>>>>>>>> RocksDB's > >>>>>>>>>>>>>>> WriteBatchWithIndex would make sure that the step 4 and > >>>>>>> step 5 > >>>>>>>>>>> could > >>>>>>>>>>>> be > >>>>>>>>>>>>>>> done atomically here. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Originally what I was thinking when creating the JIRA > >>>>>> ticket > >>>>>>>>> is > >>>>>>>>>>> that > >>>>>>>>>>>> we > >>>>>>>>>>>>>>> need to let the state store to provide a transactional > >>>>> API > >>>>>>>>> like > >>>>>>>>>>>> "token > >>>>>>>>>>>>>>> commit()" used in step 4) above which returns a token, > >>>>>> that > >>>>>>>>> e.g. > >>>>>>>>>> in > >>>>>>>>>>>> our > >>>>>>>>>>>>>>> example above indicates offset 200, and that token > >>>>> would > >>>>>> be > >>>>>>>>>> written > >>>>>>>>>>>> as > >>>>>>>>>>>>>> part > >>>>>>>>>>>>>>> of the records in Kafka transaction in step 5). And > >>>>> upon > >>>>>>>>> recovery > >>>>>>>>>>> the > >>>>>>>>>>>>>> state > >>>>>>>>>>>>>>> store would have another API like "rollback(token)" > >>>>> where > >>>>>>> the > >>>>>>>>>> token > >>>>>>>>>>>> is > >>>>>>>>>>>>>> read > >>>>>>>>>>>>>>> from the latest committed txn, and be used to rollback > >>>>> the > >>>>>>>>> store > >>>>>>>>>> to > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>> committed image. I think your proposal is different, > >>>>> and > >>>>>> it > >>>>>>>>> seems > >>>>>>>>>>>> like > >>>>>>>>>>>>>>> you're proposing we swap step 3) and 4) above, but the > >>>>>>>>> atomicity > >>>>>>>>>>>> issue > >>>>>>>>>>>>>>> still remains since now you may have the store image at > >>>>>> 100 > >>>>>>>>> but > >>>>>>>>>> the > >>>>>>>>>>>>>>> changelog is committed at 200. I'd like to learn more > >>>>>> about > >>>>>>>>> the > >>>>>>>>>>>> details > >>>>>>>>>>>>>>> on how it resolves such issues. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Anyways, that's just an example to make the point that > >>>>>> there > >>>>>>>>> are > >>>>>>>>>>> lots > >>>>>>>>>>>>> of > >>>>>>>>>>>>>>> implementational details which would drive the public > >>>>> API > >>>>>>>>> design, > >>>>>>>>>>> and > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>> should probably first do a POC, and come back to > >>>>> discuss > >>>>>> the > >>>>>>>>> KIP. > >>>>>>>>>>> Let > >>>>>>>>>>>>> me > >>>>>>>>>>>>>>> know what you think? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 AM Sagar < > >>>>>>>>>> sagarmeansoc...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Alexander, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the KIP! This seems like a great proposal. > >>>>> I > >>>>>>> have > >>>>>>>>> the > >>>>>>>>>>>> same > >>>>>>>>>>>>>>>> opinion as John on the Configuration part though. I > >>>>> think > >>>>>>>>> the 2 > >>>>>>>>>>>> level > >>>>>>>>>>>>>>>> config and its behaviour based on the > >>>>> setting/unsetting > >>>>>> of > >>>>>>>>> the > >>>>>>>>>>> flag > >>>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>> confusing to me as well. Since the KIP seems > >>>>> specifically > >>>>>>>>>> centred > >>>>>>>>>>>>> around > >>>>>>>>>>>>>>>> RocksDB it might be better to add it at the Supplier > >>>>>> level > >>>>>>> as > >>>>>>>>>> John > >>>>>>>>>>>>>>>> suggested. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On similar lines, this config name => > >>>>>>>>>>>>>> *statestore.transactional.mechanism > >>>>>>>>>>>>>>>> *may > >>>>>>>>>>>>>>>> also need rethinking as the value assigned to > >>>>>>>>>>> it(rocksdb_indexbatch) > >>>>>>>>>>>>>>>> implicitly seems to assume that rocksdb is the only > >>>>>>>>> statestore > >>>>>>>>>>> that > >>>>>>>>>>>>>> Kafka > >>>>>>>>>>>>>>>> Stream supports while that's not the case. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Also, regarding the potential memory pressure that > >>>>> can be > >>>>>>>>>>> introduced > >>>>>>>>>>>>> by > >>>>>>>>>>>>>>>> WriteBatchIndex, do you think it might make more > >>>>> sense to > >>>>>>>>>> include > >>>>>>>>>>>> some > >>>>>>>>>>>>>>>> numbers/benchmarks on how much the memory consumption > >>>>>> might > >>>>>>>>>>>> increase? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Lastly, the read_uncommitted flag's behaviour on IQ > >>>>> may > >>>>>>> need > >>>>>>>>>> more > >>>>>>>>>>>>>>>> elaboration. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> These points aside, as I said, this is a great > >>>>> proposal! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>>> Sagar. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 PM John Roesler < > >>>>>>>>>>> vvcep...@apache.org> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the KIP, Alex! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I'm really happy to see your proposal. This > >>>>> improvement > >>>>>>>>> fills a > >>>>>>>>>>>>>>>>> long-standing gap. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I have a few questions: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1. Configuration > >>>>>>>>>>>>>>>>> The KIP only mentions RocksDB, but of course, Streams > >>>>>> also > >>>>>>>>>> ships > >>>>>>>>>>>> with > >>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>> InMemory store, and users also plug in their own > >>>>> custom > >>>>>>>>> state > >>>>>>>>>>>> stores. > >>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> also common to use multiple types of state stores in > >>>>> the > >>>>>>>>> same > >>>>>>>>>>>>>> application > >>>>>>>>>>>>>>>>> for different purposes. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Against this backdrop, the choice to configure > >>>>>>>>> transactionality > >>>>>>>>>>> as > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> top-level config, as well as to configure the store > >>>>>>>>> transaction > >>>>>>>>>>>>>> mechanism > >>>>>>>>>>>>>>>>> as a top-level config, seems a bit off. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Did you consider instead just adding the option to > >>>>> the > >>>>>>>>>>>>>>>>> RocksDB*StoreSupplier classes and the factories in > >>>>>> Stores > >>>>>>> ? > >>>>>>>>> It > >>>>>>>>>>>> seems > >>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>> the desire to enable the feature by default, but > >>>>> with a > >>>>>>>>>>>> feature-flag > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> disable it was a factor here. However, as you pointed > >>>>>> out, > >>>>>>>>>> there > >>>>>>>>>>>> are > >>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>> major considerations that users should be aware of, > >>>>> so > >>>>>>>>> opt-in > >>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>>> seem > >>>>>>>>>>>>>>>>> like a bad choice, either. You could add an Enum > >>>>>> argument > >>>>>>> to > >>>>>>>>>>> those > >>>>>>>>>>>>>>>>> factories like `RocksDBTransactionalMechanism.{NONE, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Some points in favor of this approach: > >>>>>>>>>>>>>>>>> * Avoid "stores that don't support transactions > >>>>> ignore > >>>>>> the > >>>>>>>>>>> config" > >>>>>>>>>>>>>>>>> complexity > >>>>>>>>>>>>>>>>> * Users can choose how to spend their memory budget, > >>>>>>> making > >>>>>>>>>> some > >>>>>>>>>>>>> stores > >>>>>>>>>>>>>>>>> transactional and others not > >>>>>>>>>>>>>>>>> * When we add transactional support to in-memory > >>>>> stores, > >>>>>>> we > >>>>>>>>>> don't > >>>>>>>>>>>>> have > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> figure out what to do with the mechanism config > >>>>> (i.e., > >>>>>>> what > >>>>>>>>> do > >>>>>>>>>>> you > >>>>>>>>>>>>> set > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> mechanism to when there are multiple kinds of > >>>>>>> transactional > >>>>>>>>>>> stores > >>>>>>>>>>>> in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> topology?) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. caching/flushing/transactions > >>>>>>>>>>>>>>>>> The coupling between memory usage and flushing that > >>>>> you > >>>>>>>>>> mentioned > >>>>>>>>>>>> is > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> bit > >>>>>>>>>>>>>>>>> troubling. It also occurs to me that there seems to > >>>>> be > >>>>>>> some > >>>>>>>>>>>>>> relationship > >>>>>>>>>>>>>>>>> with the existing record cache, which is also an > >>>>>> in-memory > >>>>>>>>>>> holding > >>>>>>>>>>>>> area > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> records that are not yet written to the cache and/or > >>>>>> store > >>>>>>>>>>> (albeit > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> no > >>>>>>>>>>>>>>>>> particular semantics). Have you considered how all > >>>>> these > >>>>>>>>>>> components > >>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>> relate? For example, should a "full" WriteBatch > >>>>> actually > >>>>>>>>>> trigger > >>>>>>>>>>> a > >>>>>>>>>>>>>> flush > >>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>> that we don't get OOMEs? If the proposed > >>>>> transactional > >>>>>>>>>> mechanism > >>>>>>>>>>>>> forces > >>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>> uncommitted writes to be buffered in memory, until a > >>>>>>> commit, > >>>>>>>>>> then > >>>>>>>>>>>>> what > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> the advantage over just doing the same thing with the > >>>>>>>>>> RecordCache > >>>>>>>>>>>> and > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>> introducing the WriteBatch at all? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. ALOS > >>>>>>>>>>>>>>>>> You mentioned that a transactional store can help > >>>>> reduce > >>>>>>>>>>>> duplication > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> the case of ALOS. We might want to be careful about > >>>>>> claims > >>>>>>>>> like > >>>>>>>>>>>> that. > >>>>>>>>>>>>>>>>> Duplication isn't the way that repeated processing > >>>>>>>>> manifests in > >>>>>>>>>>>> state > >>>>>>>>>>>>>>>>> stores. Rather, it is in the form of dirty reads > >>>>> during > >>>>>>>>>>>> reprocessing. > >>>>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>> feature may reduce the incidence of dirty reads > >>>>> during > >>>>>>>>>>>> reprocessing, > >>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>> not in a predictable way. During regular processing > >>>>>> today, > >>>>>>>>> we > >>>>>>>>>>> will > >>>>>>>>>>>>> send > >>>>>>>>>>>>>>>>> some records through to the changelog in between > >>>>> commit > >>>>>>>>>>> intervals. > >>>>>>>>>>>>>> Under > >>>>>>>>>>>>>>>>> ALOS, if any of those dirty writes gets committed to > >>>>> the > >>>>>>>>>>> changelog > >>>>>>>>>>>>>> topic, > >>>>>>>>>>>>>>>>> then upon failure, we have to roll the store forward > >>>>> to > >>>>>>> them > >>>>>>>>>>>> anyway, > >>>>>>>>>>>>>>>>> regardless of this new transactional mechanism. > >>>>> That's a > >>>>>>>>>> fixable > >>>>>>>>>>>>>> problem, > >>>>>>>>>>>>>>>>> by the way, but this KIP doesn't seem to fix it. I > >>>>>> wonder > >>>>>>>>> if we > >>>>>>>>>>>>> should > >>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>> any claims about the relationship of this feature to > >>>>>> ALOS > >>>>>>> if > >>>>>>>>>> the > >>>>>>>>>>>>>>>> real-world > >>>>>>>>>>>>>>>>> behavior is so complex. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 4. IQ > >>>>>>>>>>>>>>>>> As a reminder, we have a new IQv2 mechanism now. > >>>>> Should > >>>>>> we > >>>>>>>>>>> propose > >>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>> changes to IQv1 to support this transactional > >>>>> mechanism, > >>>>>>>>> versus > >>>>>>>>>>>> just > >>>>>>>>>>>>>>>>> proposing it for IQv2? Certainly, it seems strange > >>>>> only > >>>>>> to > >>>>>>>>>>> propose > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>> for IQv1 and not v2. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Regarding your proposal for IQv1, I'm unsure what the > >>>>>>>>> behavior > >>>>>>>>>>>> should > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> for readCommitted, since the current behavior also > >>>>> reads > >>>>>>>>> out of > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>> RecordCache. I guess if readCommitted==false, then we > >>>>>> will > >>>>>>>>>>> continue > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>> read > >>>>>>>>>>>>>>>>> from the cache first, then the Batch, then the store; > >>>>>> and > >>>>>>> if > >>>>>>>>>>>>>>>>> readCommitted==true, we would skip the cache and the > >>>>>> Batch > >>>>>>>>> and > >>>>>>>>>>> only > >>>>>>>>>>>>>> read > >>>>>>>>>>>>>>>>> from the persistent RocksDB store? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> What should IQ do if I request to readCommitted on a > >>>>>>>>>>>>> non-transactional > >>>>>>>>>>>>>>>>> store? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks again for proposing the KIP, and my apologies > >>>>> for > >>>>>>> the > >>>>>>>>>> long > >>>>>>>>>>>>>> reply; > >>>>>>>>>>>>>>>>> I'm hoping to air all my concerns in one "batch" to > >>>>> save > >>>>>>>>> time > >>>>>>>>>> for > >>>>>>>>>>>>> you. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov > >>>>>>> wrote: > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I've written a KIP for making Kafka Streams state > >>>>>> stores > >>>>>>>>>>>>> transactional > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> would like to start a discussion: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Alex > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> > >>>>>>>>>>>>> [image: Confluent] <https://www.confluent.io> > >>>>>>>>>>>>> Suhas Satish > >>>>>>>>>>>>> Engineering Manager > >>>>>>>>>>>>> Follow us: [image: Blog] > >>>>>>>>>>>>> < > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog > >>>>>>>>>>>>>> [image: > >>>>>>>>>>>>> Twitter] <https://twitter.com/ConfluentInc>[image: > >>>>> LinkedIn] > >>>>>>>>>>>>> <https://www.linkedin.com/company/confluent/> > >>>>>>>>>>>>> > >>>>>>>>>>>>> [image: Try Confluent Cloud for Free] > >>>>>>>>>>>>> < > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >> > https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> -- Guozhang > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>> > >> > > >