Hey Nick, Thank you for the prototype testing and benchmarking, and sorry for the late reply!
I agree that it is worth revisiting the WriteBatchWithIndex approach. I will implement a fork of the current prototype that uses that mechanism to ensure transactionality and let you know when it is ready for review/testing in this ML thread. As for time estimates, I might not have enough time to finish the prototype in December, so it will probably be ready for review in January. Best, Alex On Fri, Nov 11, 2022 at 4:24 PM Nick Telford <nick.telf...@gmail.com> wrote: > Hi everyone, > > Sorry to dredge this up again. I've had a chance to start doing some > testing with the WIP Pull Request, and it appears as though the secondary > store solution performs rather poorly. > > In our testing, we had a non-transactional state store that would restore > (from scratch), at a rate of nearly 1,000,000 records/second. When we > switched it to a transactional store, it restored at a rate of less than > 40,000 records/second. > > I suspect the key issues here are having to copy the data out of the > temporary store and into the main store on-commit, and to a lesser extent, > the extra memory copies during writes. > > I think it's worth re-visiting the WriteBatchWithIndex solution, as it's > clear from the RocksDB post[1] on the subject that it's the recommended way > to achieve transactionality. > > The only issue you identified with this solution was that uncommitted > writes are required to entirely fit in-memory, and RocksDB recommends they > don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I > think we'll find that this will be a non-issue for all but the most extreme > cases, and for those, I think I have a fairly simple solution. > > Firstly, when EOS is enabled, the default commit.interval.ms is set to > 100ms, which provides fairly short intervals that uncommitted writes need > to be buffered in-memory. If we assume a worst case of 1024 byte records > (and for most cases, they should be much smaller), then 4MiB would hold > ~4096 records, which with 100ms commit intervals is a throughput of > approximately 40,960 records/second. This seems quite reasonable. > > For use cases that wouldn't reasonably fit in-memory, my suggestion is that > we have a mechanism that tracks the number/size of uncommitted records in > stores, and prematurely commits the Task when this size exceeds a > configured threshold. > > Thanks for your time, and let me know what you think! > -- > Nick > > 1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html > > On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov > <asorokou...@confluent.io.invalid> wrote: > > > Hey Nick, > > > > It is going to be option c. Existing state is considered to be committed > > and there will be an additional RocksDB for uncommitted writes. > > > > I am out of office until October 24. I will update KIP and make sure that > > we have an upgrade test for that after coming back from vacation. > > > > Best, > > Alex > > > > On Thu, Oct 6, 2022 at 5:06 PM Nick Telford <nick.telf...@gmail.com> > > wrote: > > > > > Hi everyone, > > > > > > I realise this has already been voted on and accepted, but it occurred > to > > > me today that the KIP doesn't define the migration/upgrade path for > > > existing non-transactional StateStores that *become* transactional, > i.e. > > by > > > adding the transactional boolean to the StateStore constructor. > > > > > > What would be the result, when such a change is made to a Topology, > > without > > > explicitly wiping the application state? > > > a) An error. > > > b) Local state is wiped. > > > c) Existing RocksDB database is used as committed writes and new > RocksDB > > > database is created for uncommitted writes. > > > d) Something else? > > > > > > Regards, > > > > > > Nick > > > > > > On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov > > > <asorokou...@confluent.io.invalid> wrote: > > > > > > > Hey Guozhang, > > > > > > > > Sounds good. I annotated all added StateStore methods (commit, > recover, > > > > transactional) with @Evolving. > > > > > > > > Best, > > > > Alex > > > > > > > > > > > > > > > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Alex, > > > > > > > > > > Thanks for the detailed replies, I think that makes sense, and in > the > > > > long > > > > > run we would need some public indicators from StateStore to > determine > > > if > > > > > checkpoints can really be used to indicate clean snapshots. > > > > > > > > > > As for the @Evolving label, I think we can still keep it but for a > > > > > different reason, since as we add more state management > > functionalities > > > > in > > > > > the near future we may need to revisit the public APIs again and > > hence > > > > > keeping it as @Evolving would allow us to modify if necessary, in > an > > > > easier > > > > > path than deprecate -> delete after several minor releases. > > > > > > > > > > Besides that, I have no further comments about the KIP. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Fri, Aug 26, 2022 at 1:51 AM Alexander Sorokoumov > > > > > <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > > > I think that we will have to keep StateStore#transactional() > > because > > > > > > post-commit checkpointing of non-txn state stores will break the > > > > > guarantees > > > > > > we want in > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint > > > > for > > > > > > correct recovery. Let's consider checkpoint-recovery behavior > under > > > EOS > > > > > > that we want to support: > > > > > > > > > > > > 1. Non-txn state stores should checkpoint on graceful shutdown > and > > > > > restore > > > > > > from that checkpoint. > > > > > > > > > > > > 2. Non-txn state stores should delete local data during recovery > > > after > > > > a > > > > > > crash failure. > > > > > > > > > > > > 3. Txn state stores should checkpoint on commit and on graceful > > > > shutdown. > > > > > > These stores should roll back uncommitted changes instead of > > deleting > > > > all > > > > > > local data. > > > > > > > > > > > > > > > > > > #1 and #2 are already supported; this proposal adds #3. > > Essentially, > > > we > > > > > > have two parties at play here - the post-commit checkpointing in > > > > > > StreamTask#postCommit and recovery in ProcessorStateManager# > > > > > > initializeStoreOffsetsFromCheckpoint. Together, these methods > must > > > > allow > > > > > > all three workflows and prevent invalid behavior, e.g., non-txn > > > stores > > > > > > should not checkpoint post-commit to avoid keeping uncommitted > data > > > on > > > > > > recovery. > > > > > > > > > > > > > > > > > > In the current state of the prototype, we checkpoint only txn > state > > > > > stores > > > > > > post-commit under EOS using StateStore#transactional(). If we > > remove > > > > > > StateStore#transactional() and always checkpoint post-commit, > > > > > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint will > > have > > > to > > > > > > determine whether to delete local data. Non-txn implementation of > > > > > > StateStore#recover can't detect if it has uncommitted writes. > Since > > > its > > > > > > default implementation must always return either true or false, > > > > signaling > > > > > > whether it is restored into a valid committed-only state. If > > > > > > StateStore#recover always returns true, we preserve uncommitted > > > writes > > > > > and > > > > > > violate correctness. Otherwise, ProcessorStateManager# > > > > > > initializeStoreOffsetsFromCheckpoint would always delete local > data > > > > even > > > > > > after > > > > > > a graceful shutdown. > > > > > > > > > > > > > > > > > > With StateStore#transactional we avoid checkpointing non-txn > state > > > > stores > > > > > > and prevent that problem during recovery. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Alex > > > > > > > > > > > > On Fri, Aug 19, 2022 at 1:05 AM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hello Alex, > > > > > > > > > > > > > > Thanks for the replies! > > > > > > > > > > > > > > > As long as we allow custom user implementations of that > > > interface, > > > > we > > > > > > > should > > > > > > > probably either keep that flag to distinguish between > > transactional > > > > and > > > > > > > non-transactional implementations or change the contract behind > > the > > > > > > > interface. What do you think? > > > > > > > > > > > > > > Regarding this question, I thought that in the long run, we may > > > > always > > > > > > > write checkpoints regardless of txn v.s. non-txn stores, in > which > > > > case > > > > > we > > > > > > > would not need that `StateStore#transactional()`. But for now > in > > > > order > > > > > > for > > > > > > > backward compatibility edge cases we still need to distinguish > on > > > > > whether > > > > > > > or not to write checkpoints. Maybe I was mis-reading its > > purposes? > > > If > > > > > > yes, > > > > > > > please let me know. > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 15, 2022 at 7:56 AM Alexander Sorokoumov > > > > > > > <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > Thank you for elaborating! I like your idea to introduce a > > > > > > StreamsConfig > > > > > > > > specifically for the default store APIs. You mentioned > > > > Materialized, > > > > > > but > > > > > > > I > > > > > > > > think changes in StreamJoined follow the same logic. > > > > > > > > > > > > > > > > I updated the KIP and the prototype according to your > > > suggestions: > > > > > > > > * Add a new StoreType and a StreamsConfig for transactional > > > > RocksDB. > > > > > > > > * Decide whether Materialized/StreamJoined are transactional > > > based > > > > on > > > > > > the > > > > > > > > configured StoreType. > > > > > > > > * Move RocksDBTransactionalMechanism to > > > > > > > > org.apache.kafka.streams.state.internals to remove it from > the > > > > > proposal > > > > > > > > scope. > > > > > > > > * Add a flag in new Stores methods to configure a state store > > as > > > > > > > > transactional. Transactional state stores use the default > > > > > transactional > > > > > > > > mechanism. > > > > > > > > * The changes above allowed to remove all changes to the > > > > > StoreSupplier > > > > > > > > interface. > > > > > > > > > > > > > > > > I am not sure about marking StateStore#transactional() as > > > evolving. > > > > > As > > > > > > > long > > > > > > > > as we allow custom user implementations of that interface, we > > > > should > > > > > > > > probably either keep that flag to distinguish between > > > transactional > > > > > and > > > > > > > > non-transactional implementations or change the contract > behind > > > the > > > > > > > > interface. What do you think? > > > > > > > > > > > > > > > > Best, > > > > > > > > Alex > > > > > > > > > > > > > > > > On Thu, Aug 11, 2022 at 1:00 AM Guozhang Wang < > > > wangg...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hello Alex, > > > > > > > > > > > > > > > > > > Thanks for the replies. Regarding the global config v.s. > > > > per-store > > > > > > > spec, > > > > > > > > I > > > > > > > > > agree with John's early comments to some degrees, but I > think > > > we > > > > > may > > > > > > > well > > > > > > > > > distinguish a couple scenarios here. In sum we are > discussing > > > > about > > > > > > the > > > > > > > > > following levels of per-store spec: > > > > > > > > > > > > > > > > > > * Materialized#transactional() > > > > > > > > > * StoreSupplier#transactional() > > > > > > > > > * StateStore#transactional() > > > > > > > > > * Stores.persistentTransactionalKeyValueStore()... > > > > > > > > > > > > > > > > > > And my thoughts are the following: > > > > > > > > > > > > > > > > > > * In the current proposal users could specify transactional > > as > > > > > either > > > > > > > > > "Materialized.as("storeName").withTransantionsEnabled()" or > > > > > > > > > > > > > "Materialized.as(Stores.persistentTransactionalKeyValueStore(..))", > > > > > > > which > > > > > > > > > seems not necessary to me. In general, the more options the > > > > library > > > > > > > > > provides, the messier for users to learn the new APIs. > > > > > > > > > > > > > > > > > > * When using built-in stores, users would usually go with > > > > > > > > > Materialized.as("storeName"). In such cases I feel it's not > > > very > > > > > > > > meaningful > > > > > > > > > to specify "some of the built-in stores to be > transactional, > > > > while > > > > > > > others > > > > > > > > > be non transactional": as long as one of your stores are > > > > > > > > non-transactional, > > > > > > > > > you'd still pay for large restoration cost upon unclean > > > failure. > > > > > > People > > > > > > > > > may, indeed, want to specify if different transactional > > > > mechanisms > > > > > to > > > > > > > be > > > > > > > > > used across stores; but for whether or not the stores > should > > be > > > > > > > > > transactional, I feel it's really an "all or none" answer, > > and > > > > our > > > > > > > > built-in > > > > > > > > > form (rocksDB) should support transactionality for all > store > > > > types. > > > > > > > > > > > > > > > > > > * When using customized stores, users would usually go with > > > > > > > > > Materialized.as(StoreSupplier). And it's possible if users > > > would > > > > > > choose > > > > > > > > > some to be transactional while others non-transactional > (e.g. > > > if > > > > > > their > > > > > > > > > customized store only supports transactional for some store > > > > types, > > > > > > but > > > > > > > > not > > > > > > > > > others). > > > > > > > > > > > > > > > > > > * At a per-store level, the library do not really care, or > > need > > > > to > > > > > > know > > > > > > > > > whether that store is transactional or not at runtime, > except > > > for > > > > > > > > > compatibility reasons today we want to make sure the > written > > > > > > checkpoint > > > > > > > > > files do not include those non-transactional stores. But > this > > > > check > > > > > > > would > > > > > > > > > eventually go away as one day we would always checkpoint > > files. > > > > > > > > > > > > > > > > > > --------------------------- > > > > > > > > > > > > > > > > > > With all of that in mind, my gut feeling is that: > > > > > > > > > > > > > > > > > > * Materialized#transactional(): we would not need this > knob, > > > > since > > > > > > for > > > > > > > > > built-in stores I think just a global config should be > > > sufficient > > > > > > (see > > > > > > > > > below), while for customized store users would need to > > specify > > > > that > > > > > > via > > > > > > > > the > > > > > > > > > StoreSupplier anyways and not through this API. Hence I > think > > > for > > > > > > > either > > > > > > > > > case we do not need to expose such a knob on the > Materialized > > > > > level. > > > > > > > > > > > > > > > > > > * Stores.persistentTransactionalKeyValueStore(): I think we > > > could > > > > > > > > refactor > > > > > > > > > that function without introducing new constructors in the > > > Stores > > > > > > > factory, > > > > > > > > > but just add new overloads to the existing func name e.g. > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > persistentKeyValueStore(final String name, final boolean > > > > > > transactional) > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > Plus we can augment the storeImplType as introduced in > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store > > > > > > > > > as a syntax sugar for users, e.g. > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > public enum StoreImplType { > > > > > > > > > ROCKS_DB, > > > > > > > > > TXN_ROCKS_DB, > > > > > > > > > IN_MEMORY > > > > > > > > > } > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > stream.groupByKey().count(Materialized.withStoreType(StoreImplType.TXN_ > > > > > > > > > ROCKS_DB)); > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > The above provides this global config at the store impl > type > > > > level. > > > > > > > > > > > > > > > > > > * RocksDBTransactionalMechanism: I agree with Bruno that we > > > would > > > > > > > better > > > > > > > > > not expose this knob to users, but rather keep it purely as > > an > > > > impl > > > > > > > > detail > > > > > > > > > abstracted from the "TXN_ROCKS_DB" type. Over time we may, > > e.g. > > > > use > > > > > > > > > in-memory stores as the secondary stores with optional > > > > > spill-to-disks > > > > > > > > when > > > > > > > > > we hit the memory limit, but all of that optimizations in > the > > > > > future > > > > > > > > should > > > > > > > > > be kept away from the users. > > > > > > > > > > > > > > > > > > * StoreSupplier#transactional() / > StateStore#transactional(): > > > the > > > > > > first > > > > > > > > > flag is only used to be passed into the StateStore layer, > for > > > > > > > indicating > > > > > > > > if > > > > > > > > > we should write checkpoints; we could mark it as @evolving > so > > > > that > > > > > we > > > > > > > can > > > > > > > > > one day remove it without a long deprecation period. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 10, 2022 at 8:04 AM Alexander Sorokoumov > > > > > > > > > <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > Hey Guozhang, Bruno, > > > > > > > > > > > > > > > > > > > > Thank you for your feedback. I am going to respond to > both > > of > > > > you > > > > > > in > > > > > > > a > > > > > > > > > > single email. I hope it is okay. > > > > > > > > > > > > > > > > > > > > @Guozhang, > > > > > > > > > > > > > > > > > > > > We could, instead, have a global > > > > > > > > > > > config to specify if the built-in stores should be > > > > > transactional > > > > > > or > > > > > > > > > not. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This was the original approach I took in this proposal. > > > Earlier > > > > > in > > > > > > > this > > > > > > > > > > thread John, Sagar, and Bruno listed a number of issues > > with > > > > it. > > > > > I > > > > > > > tend > > > > > > > > > to > > > > > > > > > > agree with them that it is probably better user > experience > > to > > > > > > control > > > > > > > > > > transactionality via Materialized objects. > > > > > > > > > > > > > > > > > > > > We could simplify our implementation for `commit` > > > > > > > > > > > > > > > > > > > > Agreed! I updated the prototype and removed references to > > the > > > > > > commit > > > > > > > > > marker > > > > > > > > > > and rolling forward from the proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @Bruno, > > > > > > > > > > > > > > > > > > > > So, I would remove the details about the 2-state-store > > > > > > implementation > > > > > > > > > > > from the KIP or provide it as an example of a possible > > > > > > > implementation > > > > > > > > > at > > > > > > > > > > > the end of the KIP. > > > > > > > > > > > > > > > > > > > > > I moved the section about the 2-state-store > implementation > > to > > > > the > > > > > > > > bottom > > > > > > > > > of > > > > > > > > > > the proposal and always mention it as a reference > > > > implementation. > > > > > > > > Please > > > > > > > > > > let me know if this is okay. > > > > > > > > > > > > > > > > > > > > Could you please describe the usage of commit() and > > recover() > > > > in > > > > > > the > > > > > > > > > > > commit workflow in the KIP as we did in this thread but > > > > > > > independently > > > > > > > > > > > from the state store implementation? > > > > > > > > > > > > > > > > > > > > I described how commit/recover change the workflow in the > > > > > Overview > > > > > > > > > section. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Alex > > > > > > > > > > > > > > > > > > > > On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna < > > > > > cado...@apache.org > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Alex, > > > > > > > > > > > > > > > > > > > > > > Thank a lot for explaining! > > > > > > > > > > > > > > > > > > > > > > Now some aspects are clearer to me. > > > > > > > > > > > > > > > > > > > > > > While I understand now, how the state store can roll > > > > forward, I > > > > > > > have > > > > > > > > > the > > > > > > > > > > > feeling that rolling forward is specific to the > > > 2-state-store > > > > > > > > > > > implementation with RocksDB of your PoC. Other state > > store > > > > > > > > > > > implementations might use a different strategy to react > > to > > > > > > crashes. > > > > > > > > For > > > > > > > > > > > example, they might apply an atomic write and > effectively > > > > > > rollback > > > > > > > if > > > > > > > > > > > they crash before committing the state store > > transaction. I > > > > > think > > > > > > > the > > > > > > > > > > > KIP should not contain such implementation details but > > > > provide > > > > > an > > > > > > > > > > > interface to accommodate rolling forward and rolling > > > > backward. > > > > > > > > > > > > > > > > > > > > > > So, I would remove the details about the 2-state-store > > > > > > > implementation > > > > > > > > > > > from the KIP or provide it as an example of a possible > > > > > > > implementation > > > > > > > > > at > > > > > > > > > > > the end of the KIP. > > > > > > > > > > > > > > > > > > > > > > Since a state store implementation can roll forward or > > roll > > > > > > back, I > > > > > > > > > > > think it is fine to return the changelog offset from > > > > recover(). > > > > > > > With > > > > > > > > > the > > > > > > > > > > > returned changelog offset, Streams knows from where to > > > start > > > > > > state > > > > > > > > > store > > > > > > > > > > > restoration. > > > > > > > > > > > > > > > > > > > > > > Could you please describe the usage of commit() and > > > recover() > > > > > in > > > > > > > the > > > > > > > > > > > commit workflow in the KIP as we did in this thread but > > > > > > > independently > > > > > > > > > > > from the state store implementation? That would make > > things > > > > > > > clearer. > > > > > > > > > > > Additionally, descriptions of failure scenarios would > > also > > > be > > > > > > > > helpful. > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Bruno > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 04.08.22 16:39, Alexander Sorokoumov wrote: > > > > > > > > > > > > Hey Bruno, > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the suggestions and the clarifying > > > > questions. I > > > > > > > > believe > > > > > > > > > > > that > > > > > > > > > > > > they cover the core of this proposal, so it is > crucial > > > for > > > > us > > > > > > to > > > > > > > be > > > > > > > > > on > > > > > > > > > > > the > > > > > > > > > > > > same page. > > > > > > > > > > > > > > > > > > > > > > > > 1. Don't you want to deprecate StateStore#flush(). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Good call! I updated both the proposal and the > > prototype. > > > > > > > > > > > > > > > > > > > > > > > > 2. I would shorten > > > > > Materialized#withTransactionalityEnabled() > > > > > > > to > > > > > > > > > > > >> Materialized#withTransactionsEnabled(). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Turns out, these methods are no longer necessary. I > > > removed > > > > > > them > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > > > proposal and the prototype. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> 3. Could you also describe a bit more in detail > where > > > the > > > > > > > offsets > > > > > > > > > > passed > > > > > > > > > > > >> into commit() and recover() come from? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The offset passed into StateStore#commit is the last > > > offset > > > > > > > > committed > > > > > > > > > > to > > > > > > > > > > > > the changelog topic. The offset passed into > > > > > StateStore#recover > > > > > > is > > > > > > > > the > > > > > > > > > > > last > > > > > > > > > > > > checkpointed offset for the given StateStore. Let's > > look > > > at > > > > > > > steps 3 > > > > > > > > > > and 4 > > > > > > > > > > > > in the commit workflow. After the > > > TaskExecutor/TaskManager > > > > > > > commits, > > > > > > > > > it > > > > > > > > > > > calls > > > > > > > > > > > > StreamTask#postCommit[1] that in turn: > > > > > > > > > > > > a. updates the changelog offsets via > > > > > > > > > > > > ProcessorStateManager#updateChangelogOffsets[2]. The > > > > offsets > > > > > > here > > > > > > > > > come > > > > > > > > > > > from > > > > > > > > > > > > the RecordCollector[3], which tracks the latest > offsets > > > the > > > > > > > > producer > > > > > > > > > > sent > > > > > > > > > > > > without exception[4, 5]. > > > > > > > > > > > > b. flushes/commits the state store in > > > > > > > > > AbstractTask#maybeCheckpoint[6]. > > > > > > > > > > > This > > > > > > > > > > > > method essentially calls ProcessorStateManager > methods > > - > > > > > > > > > > flush/commit[7] > > > > > > > > > > > > and checkpoint[8]. ProcessorStateManager#commit goes > > over > > > > all > > > > > > > state > > > > > > > > > > > stores > > > > > > > > > > > > that belong to that task and commits them with the > > offset > > > > > > > obtained > > > > > > > > in > > > > > > > > > > > step > > > > > > > > > > > > `a`. ProcessorStateManager#checkpoint writes down > those > > > > > offsets > > > > > > > for > > > > > > > > > all > > > > > > > > > > > > state stores, except for non-transactional ones in > the > > > case > > > > > of > > > > > > > EOS. > > > > > > > > > > > > > > > > > > > > > > > > During initialization, StreamTask calls > > > > > > > > > > > > StateManagerUtil#registerStateStores[8] that in turn > > > calls > > > > > > > > > > > > > > > > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9]. > > > > > > At > > > > > > > > the > > > > > > > > > > > > moment, this method assigns checkpointed offsets to > the > > > > > > > > corresponding > > > > > > > > > > > state > > > > > > > > > > > > stores[10]. The prototype also calls > StateStore#recover > > > > with > > > > > > the > > > > > > > > > > > > checkpointed offset and assigns the offset returned > by > > > > > > > > recover()[11]. > > > > > > > > > > > > > > > > > > > > > > > > 4. I do not quite understand how a state store can > roll > > > > > > forward. > > > > > > > > You > > > > > > > > > > > >> mention in the thread the following: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The 2-state-stores commit looks like this [12]: > > > > > > > > > > > > > > > > > > > > > > > > 1. Flush the temporary state store. > > > > > > > > > > > > 2. Create a commit marker with a changelog offset > > > > > > > corresponding > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > state we are committing. > > > > > > > > > > > > 3. Go over all keys in the temporary store and > > write > > > > them > > > > > > > down > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > > > main one. > > > > > > > > > > > > 4. Wipe the temporary store. > > > > > > > > > > > > 5. Delete the commit marker. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Let's consider crash failure scenarios: > > > > > > > > > > > > > > > > > > > > > > > > - Crash failure happens between steps 1 and 2. > The > > > main > > > > > > state > > > > > > > > > store > > > > > > > > > > > is > > > > > > > > > > > > in a consistent state that corresponds to the > > > > previously > > > > > > > > > > checkpointed > > > > > > > > > > > > offset. StateStore#recover throws away the > > temporary > > > > > store > > > > > > > and > > > > > > > > > > > proceeds > > > > > > > > > > > > from the last checkpointed offset. > > > > > > > > > > > > - Crash failure happens between steps 2 and 3. We > > do > > > > not > > > > > > know > > > > > > > > > what > > > > > > > > > > > keys > > > > > > > > > > > > from the temporary store were already written to > > the > > > > main > > > > > > > > store, > > > > > > > > > so > > > > > > > > > > > we > > > > > > > > > > > > can't roll back. There are two options - either > > wipe > > > > the > > > > > > main > > > > > > > > > store > > > > > > > > > > > or roll > > > > > > > > > > > > forward. Since the point of this proposal is to > > avoid > > > > > > > > situations > > > > > > > > > > > where we > > > > > > > > > > > > throw away the state and we do not care to what > > > > > consistent > > > > > > > > state > > > > > > > > > > the > > > > > > > > > > > store > > > > > > > > > > > > rolls to, we roll forward by continuing from step > > 3. > > > > > > > > > > > > - Crash failure happens between steps 3 and 4. We > > > can't > > > > > > > > > distinguish > > > > > > > > > > > > between this and the previous scenario, so we > write > > > all > > > > > the > > > > > > > > keys > > > > > > > > > > > from the > > > > > > > > > > > > temporary store. This is okay because the > operation > > > is > > > > > > > > > idempotent. > > > > > > > > > > > > - Crash failure happens between steps 4 and 5. > > Again, > > > > we > > > > > > > can't > > > > > > > > > > > > distinguish between this and previous scenarios, > > but > > > > the > > > > > > > > > temporary > > > > > > > > > > > store is > > > > > > > > > > > > already empty. Even though we write all keys from > > the > > > > > > > temporary > > > > > > > > > > > store, this > > > > > > > > > > > > operation is, in fact, no-op. > > > > > > > > > > > > - Crash failure happens between step 5 and > > > checkpoint. > > > > > This > > > > > > > is > > > > > > > > > the > > > > > > > > > > > case > > > > > > > > > > > > you referred to in question 5. The commit is > > > finished, > > > > > but > > > > > > it > > > > > > > > is > > > > > > > > > > not > > > > > > > > > > > > reflected at the checkpoint. recover() returns > the > > > > offset > > > > > > of > > > > > > > > the > > > > > > > > > > > previous > > > > > > > > > > > > commit here, which is incorrect, but it is okay > > > because > > > > > we > > > > > > > will > > > > > > > > > > > replay the > > > > > > > > > > > > changelog from the previously committed offset. > As > > > > > > changelog > > > > > > > > > replay > > > > > > > > > > > is > > > > > > > > > > > > idempotent, the state store recovers into a > > > consistent > > > > > > state. > > > > > > > > > > > > > > > > > > > > > > > > The last crash failure scenario is a natural > transition > > > to > > > > > > > > > > > > > > > > > > > > > > > > how should Streams know what to write into the > > checkpoint > > > > > file > > > > > > > > > > > >> after the crash? > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > As mentioned above, the Streams app writes the > > checkpoint > > > > > file > > > > > > > > after > > > > > > > > > > the > > > > > > > > > > > > Kafka transaction and then the StateStore commit. > Same > > as > > > > > > without > > > > > > > > the > > > > > > > > > > > > proposal, it should write the committed offset, as it > > is > > > > the > > > > > > same > > > > > > > > for > > > > > > > > > > > both > > > > > > > > > > > > the Kafka changelog and the state store. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> This issue arises because we store the offset > outside > > of > > > > the > > > > > > > state > > > > > > > > > > > >> store. Maybe we need an additional method on the > state > > > > store > > > > > > > > > interface > > > > > > > > > > > >> that returns the offset at which the state store is. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In my opinion, we should include in the interface > only > > > the > > > > > > > > guarantees > > > > > > > > > > > that > > > > > > > > > > > > are necessary to preserve EOS without wiping the > local > > > > state. > > > > > > > This > > > > > > > > > way, > > > > > > > > > > > we > > > > > > > > > > > > allow more room for possible implementations. Thanks > to > > > the > > > > > > > > > idempotency > > > > > > > > > > > of > > > > > > > > > > > > the changelog replay, it is "good enough" if > > > > > StateStore#recover > > > > > > > > > returns > > > > > > > > > > > the > > > > > > > > > > > > offset that is less than what it actually is. The > only > > > > > > limitation > > > > > > > > > here > > > > > > > > > > is > > > > > > > > > > > > that the state store should never commit writes that > > are > > > > not > > > > > > yet > > > > > > > > > > > committed > > > > > > > > > > > > in Kafka changelog. > > > > > > > > > > > > > > > > > > > > > > > > Please let me know what you think about this. First > of > > > > all, I > > > > > > am > > > > > > > > > > > relatively > > > > > > > > > > > > new to the codebase, so I might be wrong in my > > > > understanding > > > > > of > > > > > > > > > > > > how it works. Second, while writing this, it occured > to > > > me > > > > > that > > > > > > > the > > > > > > > > > > > > StateStore#recover interface method is not > > > straightforward > > > > as > > > > > > it > > > > > > > > can > > > > > > > > > > be. > > > > > > > > > > > > Maybe we can change it like that: > > > > > > > > > > > > > > > > > > > > > > > > /** > > > > > > > > > > > > * Recover a transactional state store > > > > > > > > > > > > * <p> > > > > > > > > > > > > * If a transactional state store shut down with > a > > > > crash > > > > > > > > failure, > > > > > > > > > > > this > > > > > > > > > > > > method ensures that the > > > > > > > > > > > > * state store is in a consistent state that > > > > corresponds > > > > > to > > > > > > > > > {@code > > > > > > > > > > > > changelofOffset} or later. > > > > > > > > > > > > * > > > > > > > > > > > > * @param changelogOffset the checkpointed > > changelog > > > > > > offset. > > > > > > > > > > > > * @return {@code true} if recovery succeeded, > > {@code > > > > > > false} > > > > > > > > > > > otherwise. > > > > > > > > > > > > */ > > > > > > > > > > > > boolean recover(final Long changelogOffset) { > > > > > > > > > > > > > > > > > > > > > > > > Note: all links below except for [10] lead to the > > > > prototype's > > > > > > > code. > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468 > > > > > > > > > > > > 2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580 > > > > > > > > > > > > 3. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868 > > > > > > > > > > > > 4. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96 > > > > > > > > > > > > 5. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216 > > > > > > > > > > > > 6. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97 > > > > > > > > > > > > 7. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469 > > > > > > > > > > > > 8. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226 > > > > > > > > > > > > 9. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103 > > > > > > > > > > > > 10. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252 > > > > > > > > > > > > 11. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265 > > > > > > > > > > > > 12. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88 > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Alex > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna < > > > > > > > cado...@apache.org> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> Hi Alex, > > > > > > > > > > > >> > > > > > > > > > > > >> Thanks for the updates! > > > > > > > > > > > >> > > > > > > > > > > > >> 1. Don't you want to deprecate StateStore#flush(). > As > > > far > > > > > as I > > > > > > > > > > > >> understand, commit() is the new flush(), right? If > you > > > do > > > > > not > > > > > > > > > > deprecate > > > > > > > > > > > >> it, you don't get rid of the error room you describe > > in > > > > your > > > > > > KIP > > > > > > > > by > > > > > > > > > > > >> having a flush() and a commit(). > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> 2. I would shorten > > > > > Materialized#withTransactionalityEnabled() > > > > > > to > > > > > > > > > > > >> Materialized#withTransactionsEnabled(). > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> 3. Could you also describe a bit more in detail > where > > > the > > > > > > > offsets > > > > > > > > > > passed > > > > > > > > > > > >> into commit() and recover() come from? > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> For my next two points, I need the commit workflow > > that > > > > you > > > > > > were > > > > > > > > so > > > > > > > > > > kind > > > > > > > > > > > >> to post into this thread: > > > > > > > > > > > >> > > > > > > > > > > > >> 1. write stuff to the state store > > > > > > > > > > > >> 2. producer.sendOffsetsToTransaction(token); > > > > > > > > > > > producer.commitTransaction(); > > > > > > > > > > > >> 3. flush (<- that would be call to commit(), right?) > > > > > > > > > > > >> 4. checkpoint > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> 4. I do not quite understand how a state store can > > roll > > > > > > forward. > > > > > > > > You > > > > > > > > > > > >> mention in the thread the following: > > > > > > > > > > > >> > > > > > > > > > > > >> "If the crash failure happens during #3, the state > > store > > > > can > > > > > > > roll > > > > > > > > > > > >> forward and finish the flush/commit." > > > > > > > > > > > >> > > > > > > > > > > > >> How does the state store know where it stopped the > > > > flushing > > > > > > when > > > > > > > > it > > > > > > > > > > > >> crashed? > > > > > > > > > > > >> > > > > > > > > > > > >> This seems an optimization to me. I think in general > > the > > > > > state > > > > > > > > store > > > > > > > > > > > >> should rollback to the last successfully committed > > state > > > > and > > > > > > > > restore > > > > > > > > > > > >> from there until the end of the changelog topic > > > partition. > > > > > The > > > > > > > > last > > > > > > > > > > > >> committed state is the offsets in the checkpoint > file. > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> 5. In the same e-mail from point 4, you also state: > > > > > > > > > > > >> > > > > > > > > > > > >> "If the crash failure happens between #3 and #4, the > > > state > > > > > > store > > > > > > > > > > should > > > > > > > > > > > >> do nothing during recovery and just proceed with the > > > > > > > checkpoint." > > > > > > > > > > > >> > > > > > > > > > > > >> How should Streams know that the failure was between > > #3 > > > > and > > > > > #4 > > > > > > > > > during > > > > > > > > > > > >> recovery? It just sees a valid state store and a > valid > > > > > > > checkpoint > > > > > > > > > > file. > > > > > > > > > > > >> Streams does not know that the state of the > checkpoint > > > > file > > > > > > does > > > > > > > > not > > > > > > > > > > > >> match with the committed state of the state store. > > > > > > > > > > > >> Also, how should Streams know what to write into the > > > > > > checkpoint > > > > > > > > file > > > > > > > > > > > >> after the crash? > > > > > > > > > > > >> This issue arises because we store the offset > outside > > of > > > > the > > > > > > > state > > > > > > > > > > > >> store. Maybe we need an additional method on the > state > > > > store > > > > > > > > > interface > > > > > > > > > > > >> that returns the offset at which the state store is. > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> Best, > > > > > > > > > > > >> Bruno > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> On 27.07.22 11:51, Alexander Sorokoumov wrote: > > > > > > > > > > > >>> Hey Nick, > > > > > > > > > > > >>> > > > > > > > > > > > >>> Thank you for the kind words and the feedback! I'll > > > > > > definitely > > > > > > > > add > > > > > > > > > an > > > > > > > > > > > >>> option to configure the transactional mechanism in > > > Stores > > > > > > > factory > > > > > > > > > > > method > > > > > > > > > > > >>> via an argument as John previously suggested and > > might > > > > add > > > > > > the > > > > > > > > > > > in-memory > > > > > > > > > > > >>> option via RocksDB Indexed Batches if I figure why > > > their > > > > > > > creation > > > > > > > > > via > > > > > > > > > > > >>> rocksdb jni fails with `UnsatisfiedLinkException`. > > > > > > > > > > > >>> > > > > > > > > > > > >>> Best, > > > > > > > > > > > >>> Alex > > > > > > > > > > > >>> > > > > > > > > > > > >>> On Wed, Jul 27, 2022 at 11:46 AM Alexander > > Sorokoumov < > > > > > > > > > > > >>> asorokou...@confluent.io> wrote: > > > > > > > > > > > >>> > > > > > > > > > > > >>>> Hey Guozhang, > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> 1) About the param passed into the `recover()` > > > function: > > > > > it > > > > > > > > seems > > > > > > > > > to > > > > > > > > > > > me > > > > > > > > > > > >>>>> that the semantics of "recover(offset)" is: > recover > > > > this > > > > > > > state > > > > > > > > > to a > > > > > > > > > > > >>>>> transaction boundary which is at least the > > passed-in > > > > > > offset. > > > > > > > > And > > > > > > > > > > the > > > > > > > > > > > >> only > > > > > > > > > > > >>>>> possibility that the returned offset is different > > > than > > > > > the > > > > > > > > > > passed-in > > > > > > > > > > > >>>>> offset > > > > > > > > > > > >>>>> is that if the previous failure happens after > we've > > > > done > > > > > > all > > > > > > > > the > > > > > > > > > > > commit > > > > > > > > > > > >>>>> procedures except writing the new checkpoint, in > > > which > > > > > case > > > > > > > the > > > > > > > > > > > >> returned > > > > > > > > > > > >>>>> offset would be larger than the passed-in offset. > > > > > Otherwise > > > > > > > it > > > > > > > > > > should > > > > > > > > > > > >>>>> always be equal to the passed-in offset, is that > > > right? > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> Right now, the only case when `recover` returns an > > > > offset > > > > > > > > > different > > > > > > > > > > > from > > > > > > > > > > > >>>> the passed one is when the failure happens > *during* > > > > > commit. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> If the failure happens after commit but before the > > > > > > checkpoint, > > > > > > > > > > > `recover` > > > > > > > > > > > >>>> might return either a passed or newer committed > > > offset, > > > > > > > > depending > > > > > > > > > on > > > > > > > > > > > the > > > > > > > > > > > >>>> implementation. The `recover` implementation in > the > > > > > > prototype > > > > > > > > > > returns > > > > > > > > > > > a > > > > > > > > > > > >>>> passed offset because it deletes the commit marker > > > that > > > > > > holds > > > > > > > > that > > > > > > > > > > > >> offset > > > > > > > > > > > >>>> after the commit is done. In that case, the store > > will > > > > > > replay > > > > > > > > the > > > > > > > > > > last > > > > > > > > > > > >>>> commit from the changelog. I think it is fine as > the > > > > > > changelog > > > > > > > > > > replay > > > > > > > > > > > is > > > > > > > > > > > >>>> idempotent. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> 2) It seems the only use for the "transactional()" > > > > > function > > > > > > is > > > > > > > > to > > > > > > > > > > > >> determine > > > > > > > > > > > >>>>> if we can update the checkpoint file while in > EOS. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> Right now, there are 2 other uses for > > > `transactional()`: > > > > > > > > > > > >>>> 1. To determine what to do during initialization > if > > > the > > > > > > > > checkpoint > > > > > > > > > > is > > > > > > > > > > > >> gone > > > > > > > > > > > >>>> (see [1]). If the state store is transactional, we > > > don't > > > > > > have > > > > > > > to > > > > > > > > > > wipe > > > > > > > > > > > >> the > > > > > > > > > > > >>>> existing data. Thinking about it now, we do not > > really > > > > > need > > > > > > > this > > > > > > > > > > check > > > > > > > > > > > >>>> whether the store is `transactional` because if it > > is > > > > not, > > > > > > > we'd > > > > > > > > > not > > > > > > > > > > > have > > > > > > > > > > > >>>> written the checkpoint in the first place. I am > > going > > > to > > > > > > > remove > > > > > > > > > that > > > > > > > > > > > >> check. > > > > > > > > > > > >>>> 2. To determine if the persistent kv store in > > > > > > KStreamImplJoin > > > > > > > > > should > > > > > > > > > > > be > > > > > > > > > > > >>>> transactional (see [2], [3]). > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> I am not sure if we can get rid of the checks in > > point > > > > 2. > > > > > If > > > > > > > so, > > > > > > > > > I'd > > > > > > > > > > > be > > > > > > > > > > > >>>> happy to encapsulate `transactional()` logic in > > > > > > > > `commit/recover`. > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> Best, > > > > > > > > > > > >>>> Alex > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> 1. > > > > > > > > > > > >>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281 > > > > > > > > > > > >>>> 2. > > > > > > > > > > > >>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278 > > > > > > > > > > > >>>> 3. > > > > > > > > > > > >>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354 > > > > > > > > > > > >>>> > > > > > > > > > > > >>>> On Tue, Jul 26, 2022 at 6:39 PM Nick Telford < > > > > > > > > > > nick.telf...@gmail.com> > > > > > > > > > > > >>>> wrote: > > > > > > > > > > > >>>> > > > > > > > > > > > >>>>> Hi Alex, > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Excellent proposal, I'm very keen to see this > land! > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Would it be useful to permit configuring the type > > of > > > > > store > > > > > > > used > > > > > > > > > for > > > > > > > > > > > >>>>> uncommitted offsets on a store-by-store basis? > This > > > > way, > > > > > > > users > > > > > > > > > > could > > > > > > > > > > > >>>>> choose > > > > > > > > > > > >>>>> whether to use, e.g. an in-memory store or > RocksDB, > > > > > > > potentially > > > > > > > > > > > >> reducing > > > > > > > > > > > >>>>> the overheads associated with RocksDb for smaller > > > > stores, > > > > > > but > > > > > > > > > > without > > > > > > > > > > > >> the > > > > > > > > > > > >>>>> memory pressure issues? > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> I suspect that in most cases, the number of > > > uncommitted > > > > > > > records > > > > > > > > > > will > > > > > > > > > > > be > > > > > > > > > > > >>>>> very small, because the default commit interval > is > > > > 100ms. > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Regards, > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> Nick > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang < > > > > > > > > wangg...@gmail.com> > > > > > > > > > > > >> wrote: > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>>>> Hello Alex, > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> Thanks for the updated KIP, I looked over it and > > > > browsed > > > > > > the > > > > > > > > WIP > > > > > > > > > > and > > > > > > > > > > > >>>>> just > > > > > > > > > > > >>>>>> have a couple meta thoughts: > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> 1) About the param passed into the `recover()` > > > > function: > > > > > > it > > > > > > > > > seems > > > > > > > > > > to > > > > > > > > > > > >> me > > > > > > > > > > > >>>>>> that the semantics of "recover(offset)" is: > > recover > > > > this > > > > > > > state > > > > > > > > > to > > > > > > > > > > a > > > > > > > > > > > >>>>>> transaction boundary which is at least the > > passed-in > > > > > > offset. > > > > > > > > And > > > > > > > > > > the > > > > > > > > > > > >>>>> only > > > > > > > > > > > >>>>>> possibility that the returned offset is > different > > > than > > > > > the > > > > > > > > > > passed-in > > > > > > > > > > > >>>>> offset > > > > > > > > > > > >>>>>> is that if the previous failure happens after > > we've > > > > done > > > > > > all > > > > > > > > the > > > > > > > > > > > >> commit > > > > > > > > > > > >>>>>> procedures except writing the new checkpoint, in > > > which > > > > > > case > > > > > > > > the > > > > > > > > > > > >> returned > > > > > > > > > > > >>>>>> offset would be larger than the passed-in > offset. > > > > > > Otherwise > > > > > > > it > > > > > > > > > > > should > > > > > > > > > > > >>>>>> always be equal to the passed-in offset, is that > > > > right? > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> 2) It seems the only use for the > "transactional()" > > > > > > function > > > > > > > is > > > > > > > > > to > > > > > > > > > > > >>>>> determine > > > > > > > > > > > >>>>>> if we can update the checkpoint file while in > EOS. > > > But > > > > > the > > > > > > > > > purpose > > > > > > > > > > > of > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>> checkpoint file's offsets is just to tell "the > > local > > > > > > state's > > > > > > > > > > current > > > > > > > > > > > >>>>>> snapshot's progress is at least the indicated > > > offsets" > > > > > > > > anyways, > > > > > > > > > > and > > > > > > > > > > > >> with > > > > > > > > > > > >>>>>> this KIP maybe we would just do: > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> a) when in ALOS, upon failover: we set the > > starting > > > > > offset > > > > > > > as > > > > > > > > > > > >>>>>> checkpointed-offset, then restore() from > changelog > > > > till > > > > > > the > > > > > > > > > > > >> end-offset. > > > > > > > > > > > >>>>>> This way we may restore some records twice. > > > > > > > > > > > >>>>>> b) when in EOS, upon failover: we first call > > > > > > > > > > > >>>>> recover(checkpointed-offset), > > > > > > > > > > > >>>>>> then set the starting offset as the returned > > offset > > > > > (which > > > > > > > may > > > > > > > > > be > > > > > > > > > > > >> larger > > > > > > > > > > > >>>>>> than checkpointed-offset), then restore until > the > > > > > > > end-offset. > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> So why not also: > > > > > > > > > > > >>>>>> c) we let the `commit()` function to also return > > an > > > > > > offset, > > > > > > > > > which > > > > > > > > > > > >>>>> indicates > > > > > > > > > > > >>>>>> "checkpointable offsets". > > > > > > > > > > > >>>>>> d) for existing non-transactional stores, we > just > > > > have a > > > > > > > > default > > > > > > > > > > > >>>>>> implementation of "commit()" which is simply a > > > flush, > > > > > and > > > > > > > > > returns > > > > > > > > > > a > > > > > > > > > > > >>>>>> sentinel value like -1. Then later if we get > > > > > > checkpointable > > > > > > > > > > offsets > > > > > > > > > > > >> -1, > > > > > > > > > > > >>>>> we > > > > > > > > > > > >>>>>> do not write the checkpoint. Upon clean shutting > > > down > > > > we > > > > > > can > > > > > > > > > just > > > > > > > > > > > >>>>>> checkpoint regardless of the returned value from > > > > > "commit". > > > > > > > > > > > >>>>>> e) for existing non-transactional stores, we > just > > > > have a > > > > > > > > default > > > > > > > > > > > >>>>>> implementation of "recover()" which is to wipe > out > > > the > > > > > > local > > > > > > > > > store > > > > > > > > > > > and > > > > > > > > > > > >>>>>> return offset 0 if the passed in offset is -1, > > > > otherwise > > > > > > if > > > > > > > > not > > > > > > > > > -1 > > > > > > > > > > > >> then > > > > > > > > > > > >>>>> it > > > > > > > > > > > >>>>>> indicates a clean shutdown in the last run, can > > this > > > > > > > function > > > > > > > > is > > > > > > > > > > > just > > > > > > > > > > > >> a > > > > > > > > > > > >>>>>> no-op. > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> In that case, we would not need the > > > "transactional()" > > > > > > > function > > > > > > > > > > > >> anymore, > > > > > > > > > > > >>>>>> since for non-transactional stores their > behaviors > > > are > > > > > > still > > > > > > > > > > wrapped > > > > > > > > > > > >> in > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>> `commit / recover` function pairs. > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> I have not completed the thorough pass on your > WIP > > > PR, > > > > > so > > > > > > > > maybe > > > > > > > > > I > > > > > > > > > > > >> could > > > > > > > > > > > >>>>>> come up with some more feedback later, but just > > let > > > me > > > > > > know > > > > > > > if > > > > > > > > > my > > > > > > > > > > > >>>>>> understanding above is correct or not? > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> Guozhang > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> On Thu, Jul 14, 2022 at 7:01 AM Alexander > > Sorokoumov > > > > > > > > > > > >>>>>> <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>>> Hi, > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> I updated the KIP with the following changes: > > > > > > > > > > > >>>>>>> * Replaced in-memory batches with the > > > secondary-store > > > > > > > > approach > > > > > > > > > as > > > > > > > > > > > the > > > > > > > > > > > >>>>>>> default implementation to address the feedback > > > about > > > > > > memory > > > > > > > > > > > pressure > > > > > > > > > > > >>>>> as > > > > > > > > > > > >>>>>>> suggested by Sagar and Bruno. > > > > > > > > > > > >>>>>>> * Introduced StateStore#commit and > > > StateStore#recover > > > > > > > methods > > > > > > > > > as > > > > > > > > > > an > > > > > > > > > > > >>>>>>> extension of the rollback idea. @Guozhang, > please > > > see > > > > > the > > > > > > > > > comment > > > > > > > > > > > >>>>> below > > > > > > > > > > > >>>>>> on > > > > > > > > > > > >>>>>>> why I took a slightly different approach than > you > > > > > > > suggested. > > > > > > > > > > > >>>>>>> * Removed mentions of changes to IQv1 and IQv2. > > > > > > > Transactional > > > > > > > > > > state > > > > > > > > > > > >>>>>> stores > > > > > > > > > > > >>>>>>> enable reading committed in IQ, but it is > really > > an > > > > > > > > independent > > > > > > > > > > > >>>>> feature > > > > > > > > > > > >>>>>>> that deserves its own KIP. Conflating them > > > > > unnecessarily > > > > > > > > > > increases > > > > > > > > > > > >> the > > > > > > > > > > > >>>>>>> scope for discussion, implementation, and > testing > > > in > > > > a > > > > > > > single > > > > > > > > > > unit > > > > > > > > > > > of > > > > > > > > > > > >>>>>> work. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> I also published a prototype - > > > > > > > > > > > >>>>>> https://github.com/apache/kafka/pull/12393 > > > > > > > > > > > >>>>>>> that implements changes described in the > > proposal. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> Regarding explicit rollback, I think it is a > > > powerful > > > > > > idea > > > > > > > > that > > > > > > > > > > > >> allows > > > > > > > > > > > >>>>>>> other StateStore implementations to take a > > > different > > > > > path > > > > > > > to > > > > > > > > > the > > > > > > > > > > > >>>>>>> transactional behavior rather than keep 2 state > > > > stores. > > > > > > > > Instead > > > > > > > > > > of > > > > > > > > > > > >>>>>>> introducing a new commit token, I suggest > using a > > > > > > changelog > > > > > > > > > > offset > > > > > > > > > > > >>>>> that > > > > > > > > > > > >>>>>>> already 1:1 corresponds to the materialized > > state. > > > > This > > > > > > > works > > > > > > > > > > > nicely > > > > > > > > > > > >>>>>>> because Kafka Stream first commits an AK > > > transaction > > > > > and > > > > > > > only > > > > > > > > > > then > > > > > > > > > > > >>>>>>> checkpoints the state store, so we can use the > > > > > changelog > > > > > > > > offset > > > > > > > > > > to > > > > > > > > > > > >>>>> commit > > > > > > > > > > > >>>>>>> the state store transaction. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> I called the method StateStore#recover rather > > than > > > > > > > > > > > >> StateStore#rollback > > > > > > > > > > > >>>>>>> because a state store might either roll back or > > > > forward > > > > > > > > > depending > > > > > > > > > > > on > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>> specific point of the crash failure.Consider > the > > > > write > > > > > > > > > algorithm > > > > > > > > > > in > > > > > > > > > > > >>>>> Kafka > > > > > > > > > > > >>>>>>> Streams is: > > > > > > > > > > > >>>>>>> 1. write stuff to the state store > > > > > > > > > > > >>>>>>> 2. producer.sendOffsetsToTransaction(token); > > > > > > > > > > > >>>>>> producer.commitTransaction(); > > > > > > > > > > > >>>>>>> 3. flush > > > > > > > > > > > >>>>>>> 4. checkpoint > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> Let's consider 3 cases: > > > > > > > > > > > >>>>>>> 1. If the crash failure happens between #2 and > > #3, > > > > the > > > > > > > state > > > > > > > > > > store > > > > > > > > > > > >>>>> rolls > > > > > > > > > > > >>>>>>> back and replays the uncommitted transaction > from > > > the > > > > > > > > > changelog. > > > > > > > > > > > >>>>>>> 2. If the crash failure happens during #3, the > > > state > > > > > > store > > > > > > > > can > > > > > > > > > > roll > > > > > > > > > > > >>>>>> forward > > > > > > > > > > > >>>>>>> and finish the flush/commit. > > > > > > > > > > > >>>>>>> 3. If the crash failure happens between #3 and > > #4, > > > > the > > > > > > > state > > > > > > > > > > store > > > > > > > > > > > >>>>> should > > > > > > > > > > > >>>>>>> do nothing during recovery and just proceed > with > > > the > > > > > > > > > checkpoint. > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> Looking forward to your feedback, > > > > > > > > > > > >>>>>>> Alexander > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:16 AM Alexander > > > Sorokoumov > > > > < > > > > > > > > > > > >>>>>>> asorokou...@confluent.io> wrote: > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>>>> Hi, > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> As a status update, I did the following > changes > > to > > > > the > > > > > > > KIP: > > > > > > > > > > > >>>>>>>> * replaced configuration via the top-level > > config > > > > with > > > > > > > > > > > configuration > > > > > > > > > > > >>>>>> via > > > > > > > > > > > >>>>>>>> Stores factory and StoreSuppliers, > > > > > > > > > > > >>>>>>>> * added IQv2 and elaborated how readCommitted > > will > > > > > work > > > > > > > when > > > > > > > > > the > > > > > > > > > > > >>>>> store > > > > > > > > > > > >>>>>> is > > > > > > > > > > > >>>>>>>> not transactional, > > > > > > > > > > > >>>>>>>> * removed claims about ALOS. > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> I am going to be OOO in the next couple of > weeks > > > and > > > > > > will > > > > > > > > > resume > > > > > > > > > > > >>>>>> working > > > > > > > > > > > >>>>>>>> on the proposal and responding to the > discussion > > > in > > > > > this > > > > > > > > > thread > > > > > > > > > > > >>>>>> starting > > > > > > > > > > > >>>>>>>> June 27. My next top priorities are: > > > > > > > > > > > >>>>>>>> 1. Prototype the rollback approach as > suggested > > by > > > > > > > Guozhang. > > > > > > > > > > > >>>>>>>> 2. Replace in-memory batches with the > > > > secondary-store > > > > > > > > approach > > > > > > > > > > as > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>> default implementation to address the feedback > > > about > > > > > > > memory > > > > > > > > > > > >>>>> pressure as > > > > > > > > > > > >>>>>>>> suggested by Sagar and Bruno. > > > > > > > > > > > >>>>>>>> 3. Adjust Stores methods to make transactional > > > > > > > > implementations > > > > > > > > > > > >>>>>> pluggable. > > > > > > > > > > > >>>>>>>> 4. Publish the POC for the first review. > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> Best regards, > > > > > > > > > > > >>>>>>>> Alex > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang < > > > > > > > > > > wangg...@gmail.com> > > > > > > > > > > > >>>>>> wrote: > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>>>> Alex, > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Thanks for your replies! That is very > helpful. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Just to broaden our discussions a bit here, I > > > think > > > > > > there > > > > > > > > are > > > > > > > > > > > some > > > > > > > > > > > >>>>>> other > > > > > > > > > > > >>>>>>>>> approaches in parallel to the idea of > "enforce > > to > > > > > only > > > > > > > > > persist > > > > > > > > > > > upon > > > > > > > > > > > >>>>>>>>> explicit flush" and I'd like to throw one > here > > -- > > > > not > > > > > > > > really > > > > > > > > > > > >>>>>> advocating > > > > > > > > > > > >>>>>>>>> it, > > > > > > > > > > > >>>>>>>>> but just for us to compare the pros and cons: > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> 1) We let the StateStore's `flush` function > to > > > > > return a > > > > > > > > token > > > > > > > > > > > >>>>> instead > > > > > > > > > > > >>>>>> of > > > > > > > > > > > >>>>>>>>> returning `void`. > > > > > > > > > > > >>>>>>>>> 2) We add another `rollback(token)` interface > > of > > > > > > > StateStore > > > > > > > > > > which > > > > > > > > > > > >>>>>> would > > > > > > > > > > > >>>>>>>>> effectively rollback the state as indicated > by > > > the > > > > > > token > > > > > > > to > > > > > > > > > the > > > > > > > > > > > >>>>>> snapshot > > > > > > > > > > > >>>>>>>>> when the corresponding `flush` is called. > > > > > > > > > > > >>>>>>>>> 3) We encode the token and commit as part of > > > > > > > > > > > >>>>>>>>> `producer#sendOffsetsToTransaction`. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Users could optionally implement the new > > > functions, > > > > > or > > > > > > > they > > > > > > > > > can > > > > > > > > > > > >>>>> just > > > > > > > > > > > >>>>>> not > > > > > > > > > > > >>>>>>>>> return the token at all and not implement the > > > > second > > > > > > > > > function. > > > > > > > > > > > >>>>> Again, > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > >>>>>>>>> APIs are just for the sake of illustration, > not > > > > > feeling > > > > > > > > they > > > > > > > > > > are > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>> most > > > > > > > > > > > >>>>>>>>> natural :) > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Then the procedure would be: > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> 1. the previous checkpointed offset is 100 > > > > > > > > > > > >>>>>>>>> ... > > > > > > > > > > > >>>>>>>>> 3. flush store, make sure all writes are > > > persisted; > > > > > get > > > > > > > the > > > > > > > > > > > >>>>> returned > > > > > > > > > > > >>>>>>> token > > > > > > > > > > > >>>>>>>>> that indicates the snapshot of 200. > > > > > > > > > > > >>>>>>>>> 4. producer.sendOffsetsToTransaction(token); > > > > > > > > > > > >>>>>>> producer.commitTransaction(); > > > > > > > > > > > >>>>>>>>> 5. Update the checkpoint file (say, the new > > value > > > > is > > > > > > > 200). > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Then if there's a failure, say between 3/4, > we > > > > would > > > > > > get > > > > > > > > the > > > > > > > > > > > token > > > > > > > > > > > >>>>>> from > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>> last committed txn, and first we would do the > > > > > > restoration > > > > > > > > > > (which > > > > > > > > > > > >>>>> may > > > > > > > > > > > >>>>>> get > > > > > > > > > > > >>>>>>>>> the state to somewhere between 100 and 200), > > then > > > > > call > > > > > > > > > > > >>>>>>>>> `store.rollback(token)` to rollback to the > > > snapshot > > > > > of > > > > > > > > offset > > > > > > > > > > > 100. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> The pros is that we would then not need to > > > enforce > > > > > the > > > > > > > > state > > > > > > > > > > > >>>>> stores to > > > > > > > > > > > >>>>>>> not > > > > > > > > > > > >>>>>>>>> persist any data during the txn: for stores > > that > > > > may > > > > > > not > > > > > > > be > > > > > > > > > > able > > > > > > > > > > > to > > > > > > > > > > > >>>>>>>>> implement the `rollback` function, they can > > still > > > > > > reduce > > > > > > > > its > > > > > > > > > > impl > > > > > > > > > > > >>>>> to > > > > > > > > > > > >>>>>>> "not > > > > > > > > > > > >>>>>>>>> persisting any data" via this API, but for > > stores > > > > > that > > > > > > > can > > > > > > > > > > indeed > > > > > > > > > > > >>>>>>> support > > > > > > > > > > > >>>>>>>>> the rollback, their implementation may be > more > > > > > > efficient. > > > > > > > > The > > > > > > > > > > > cons > > > > > > > > > > > >>>>>>> though, > > > > > > > > > > > >>>>>>>>> on top of my head are 1) more complicated > logic > > > > > > > > > differentiating > > > > > > > > > > > >>>>>> between > > > > > > > > > > > >>>>>>>>> EOS > > > > > > > > > > > >>>>>>>>> with and without store rollback support, and > > > ALOS, > > > > 2) > > > > > > > > > encoding > > > > > > > > > > > the > > > > > > > > > > > >>>>>> token > > > > > > > > > > > >>>>>>>>> as > > > > > > > > > > > >>>>>>>>> part of the commit offset is not ideal if it > is > > > > big, > > > > > 3) > > > > > > > the > > > > > > > > > > > >>>>> recovery > > > > > > > > > > > >>>>>>> logic > > > > > > > > > > > >>>>>>>>> including the state store is also a bit more > > > > > > complicated. > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> Guozhang > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> On Wed, Jun 1, 2022 at 1:29 PM Alexander > > > Sorokoumov > > > > > > > > > > > >>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Hi Guozhang, > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> But I'm still trying to clarify how it > > > guarantees > > > > > EOS, > > > > > > > and > > > > > > > > > it > > > > > > > > > > > >>>>> seems > > > > > > > > > > > >>>>>>>>> that we > > > > > > > > > > > >>>>>>>>>>> would achieve it by enforcing to not > persist > > > any > > > > > data > > > > > > > > > written > > > > > > > > > > > >>>>>> within > > > > > > > > > > > >>>>>>>>> this > > > > > > > > > > > >>>>>>>>>>> transaction until step 4. Is that correct? > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> This is correct. Both alternatives - > in-memory > > > > > > > > > > > >>>>> WriteBatchWithIndex > > > > > > > > > > > >>>>>> and > > > > > > > > > > > >>>>>>>>>> transactionality via the secondary store > > > guarantee > > > > > EOS > > > > > > > by > > > > > > > > > not > > > > > > > > > > > >>>>>>> persisting > > > > > > > > > > > >>>>>>>>>> data in the "main" state store until it is > > > > committed > > > > > > in > > > > > > > > the > > > > > > > > > > > >>>>>> changelog > > > > > > > > > > > >>>>>>>>>> topic. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Oh what I meant is not what KStream code > does, > > > but > > > > > > that > > > > > > > > > > > >>>>> StateStore > > > > > > > > > > > >>>>>>> impl > > > > > > > > > > > >>>>>>>>>>> classes themselves could potentially flush > > data > > > > to > > > > > > > become > > > > > > > > > > > >>>>>> persisted > > > > > > > > > > > >>>>>>>>>>> asynchronously > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Thank you for elaborating! You are correct, > > the > > > > > > > underlying > > > > > > > > > > state > > > > > > > > > > > >>>>>> store > > > > > > > > > > > >>>>>>>>>> should not persist data until the streams > app > > > > calls > > > > > > > > > > > >>>>>> StateStore#flush. > > > > > > > > > > > >>>>>>>>> There > > > > > > > > > > > >>>>>>>>>> are 2 options how a State Store > implementation > > > can > > > > > > > > guarantee > > > > > > > > > > > >>>>> that - > > > > > > > > > > > >>>>>>>>> either > > > > > > > > > > > >>>>>>>>>> keep uncommitted writes in memory or be able > > to > > > > roll > > > > > > > back > > > > > > > > > the > > > > > > > > > > > >>>>>> changes > > > > > > > > > > > >>>>>>>>> that > > > > > > > > > > > >>>>>>>>>> were not committed during recovery. > RocksDB's > > > > > > > > > > > >>>>> WriteBatchWithIndex is > > > > > > > > > > > >>>>>>> an > > > > > > > > > > > >>>>>>>>>> implementation of the first option. A > > considered > > > > > > > > > alternative, > > > > > > > > > > > >>>>>>>>> Transactions > > > > > > > > > > > >>>>>>>>>> via Secondary State Store for Uncommitted > > > Changes, > > > > > is > > > > > > > the > > > > > > > > > way > > > > > > > > > > to > > > > > > > > > > > >>>>>>>>> implement > > > > > > > > > > > >>>>>>>>>> the second option. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> As everyone correctly pointed out, keeping > > > > > uncommitted > > > > > > > > data > > > > > > > > > in > > > > > > > > > > > >>>>>> memory > > > > > > > > > > > >>>>>>>>>> introduces a very real risk of OOM that we > > will > > > > need > > > > > > to > > > > > > > > > > handle. > > > > > > > > > > > >>>>> The > > > > > > > > > > > >>>>>>>>> more I > > > > > > > > > > > >>>>>>>>>> think about it, the more I lean towards > going > > > with > > > > > the > > > > > > > > > > > >>>>> Transactions > > > > > > > > > > > >>>>>>> via > > > > > > > > > > > >>>>>>>>>> Secondary Store as the way to implement > > > > > > transactionality > > > > > > > > as > > > > > > > > > it > > > > > > > > > > > >>>>> does > > > > > > > > > > > >>>>>>> not > > > > > > > > > > > >>>>>>>>>> have that issue. > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> Best, > > > > > > > > > > > >>>>>>>>>> Alex > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> On Wed, Jun 1, 2022 at 12:59 PM Guozhang > Wang > > < > > > > > > > > > > > >>>>> wangg...@gmail.com> > > > > > > > > > > > >>>>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Hello Alex, > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> we flush the cache, but not the underlying > > > state > > > > > > > store. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> You're right. The ordering I mentioned > above > > is > > > > > > > actually: > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> ... > > > > > > > > > > > >>>>>>>>>>> 3. producer.sendOffsetsToTransaction(); > > > > > > > > > > > >>>>>>> producer.commitTransaction(); > > > > > > > > > > > >>>>>>>>>>> 4. flush store, make sure all writes are > > > > persisted. > > > > > > > > > > > >>>>>>>>>>> 5. Update the checkpoint file to 200. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> But I'm still trying to clarify how it > > > guarantees > > > > > > EOS, > > > > > > > > and > > > > > > > > > it > > > > > > > > > > > >>>>>> seems > > > > > > > > > > > >>>>>>>>> that > > > > > > > > > > > >>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>> would achieve it by enforcing to not > persist > > > any > > > > > data > > > > > > > > > written > > > > > > > > > > > >>>>>> within > > > > > > > > > > > >>>>>>>>> this > > > > > > > > > > > >>>>>>>>>>> transaction until step 4. Is that correct? > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Can you please point me to the place in > the > > > > > codebase > > > > > > > > where > > > > > > > > > > we > > > > > > > > > > > >>>>>>>>> trigger > > > > > > > > > > > >>>>>>>>>>> async flush before the commit? > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Oh what I meant is not what KStream code > > does, > > > > but > > > > > > that > > > > > > > > > > > >>>>> StateStore > > > > > > > > > > > >>>>>>>>> impl > > > > > > > > > > > >>>>>>>>>>> classes themselves could potentially flush > > data > > > > to > > > > > > > become > > > > > > > > > > > >>>>>> persisted > > > > > > > > > > > >>>>>>>>>>> asynchronously, e.g. RocksDB does that > > > naturally > > > > > out > > > > > > of > > > > > > > > the > > > > > > > > > > > >>>>>> control > > > > > > > > > > > >>>>>>> of > > > > > > > > > > > >>>>>>>>>>> KStream code. I think it is related to my > > > > previous > > > > > > > > > question: > > > > > > > > > > > >>>>> if we > > > > > > > > > > > >>>>>>>>> think > > > > > > > > > > > >>>>>>>>>> by > > > > > > > > > > > >>>>>>>>>>> guaranteeing EOS at the state store level, > we > > > > would > > > > > > > > > > effectively > > > > > > > > > > > >>>>>> ask > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>> impl classes that "you should not persist > any > > > > data > > > > > > > until > > > > > > > > > > > >>>>> `flush` > > > > > > > > > > > >>>>>> is > > > > > > > > > > > >>>>>>>>>> called > > > > > > > > > > > >>>>>>>>>>> explicitly", is the StateStore interface > the > > > > right > > > > > > > level > > > > > > > > to > > > > > > > > > > > >>>>>> enforce > > > > > > > > > > > >>>>>>>>> such > > > > > > > > > > > >>>>>>>>>>> mechanisms, or should we just do that on > top > > of > > > > the > > > > > > > > > > > >>>>> StateStores, > > > > > > > > > > > >>>>>>> e.g. > > > > > > > > > > > >>>>>>>>>>> during the transaction we just keep all the > > > > writes > > > > > in > > > > > > > the > > > > > > > > > > cache > > > > > > > > > > > >>>>>> (of > > > > > > > > > > > >>>>>>>>>> course > > > > > > > > > > > >>>>>>>>>>> we need to consider how to work around > memory > > > > > > pressure > > > > > > > as > > > > > > > > > > > >>>>>> previously > > > > > > > > > > > >>>>>>>>>>> mentioned), and then upon committing, we > just > > > > write > > > > > > the > > > > > > > > > > cached > > > > > > > > > > > >>>>>>> records > > > > > > > > > > > >>>>>>>>>> as a > > > > > > > > > > > >>>>>>>>>>> whole into the store and then call flush. > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> Guozhang > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> On Tue, May 31, 2022 at 4:08 PM Alexander > > > > > Sorokoumov > > > > > > > > > > > >>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote: > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Hey, > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Thank you for the wealth of great > > suggestions > > > > and > > > > > > > > > questions! > > > > > > > > > > > >>>>> I > > > > > > > > > > > >>>>>> am > > > > > > > > > > > >>>>>>>>> going > > > > > > > > > > > >>>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>> address the feedback in batches and update > > the > > > > > > > proposal > > > > > > > > > > > >>>>> async, > > > > > > > > > > > >>>>>> as > > > > > > > > > > > >>>>>>>>> it is > > > > > > > > > > > >>>>>>>>>>>> probably going to be easier for everyone. > I > > > will > > > > > > also > > > > > > > > > write > > > > > > > > > > a > > > > > > > > > > > >>>>>>>>> separate > > > > > > > > > > > >>>>>>>>>>>> message after making updates to the KIP. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> @John, > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> Did you consider instead just adding the > > > option > > > > > to > > > > > > > the > > > > > > > > > > > >>>>>>>>>>>>> RocksDB*StoreSupplier classes and the > > > factories > > > > > in > > > > > > > > > Stores ? > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Thank you for suggesting that. I think > that > > > this > > > > > > idea > > > > > > > is > > > > > > > > > > > >>>>> better > > > > > > > > > > > >>>>>>> than > > > > > > > > > > > >>>>>>>>>>> what I > > > > > > > > > > > >>>>>>>>>>>> came up with and will update the KIP with > > > > > > configuring > > > > > > > > > > > >>>>>>>>> transactionality > > > > > > > > > > > >>>>>>>>>>> via > > > > > > > > > > > >>>>>>>>>>>> the suppliers and Stores. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> what is the advantage over just doing the > > same > > > > > thing > > > > > > > > with > > > > > > > > > > the > > > > > > > > > > > >>>>>>>>>> RecordCache > > > > > > > > > > > >>>>>>>>>>>>> and not introducing the WriteBatch at > all? > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Can you point me to RecordCache? I can't > > find > > > it > > > > > in > > > > > > > the > > > > > > > > > > > >>>>> project. > > > > > > > > > > > >>>>>>> The > > > > > > > > > > > >>>>>>>>>>>> advantage would be that WriteBatch > > guarantees > > > > > write > > > > > > > > > > > >>>>> atomicity. > > > > > > > > > > > >>>>>> As > > > > > > > > > > > >>>>>>>>> far > > > > > > > > > > > >>>>>>>>>> as > > > > > > > > > > > >>>>>>>>>>> I > > > > > > > > > > > >>>>>>>>>>>> understood the way RecordCache works, it > > might > > > > > leave > > > > > > > the > > > > > > > > > > > >>>>> system > > > > > > > > > > > >>>>>> in > > > > > > > > > > > >>>>>>>>> an > > > > > > > > > > > >>>>>>>>>>>> inconsistent state during crash failure on > > > > write. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> You mentioned that a transactional store > can > > > > help > > > > > > > reduce > > > > > > > > > > > >>>>>>>>> duplication in > > > > > > > > > > > >>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>> case of ALOS > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> I will remove claims about ALOS from the > > > > proposal. > > > > > > > Thank > > > > > > > > > you > > > > > > > > > > > >>>>> for > > > > > > > > > > > >>>>>>>>>>>> elaborating! > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> As a reminder, we have a new IQv2 > mechanism > > > now. > > > > > > > Should > > > > > > > > we > > > > > > > > > > > >>>>>> propose > > > > > > > > > > > >>>>>>>>> any > > > > > > > > > > > >>>>>>>>>>>>> changes to IQv1 to support this > > transactional > > > > > > > > mechanism, > > > > > > > > > > > >>>>>> versus > > > > > > > > > > > >>>>>>>>> just > > > > > > > > > > > >>>>>>>>>>>>> proposing it for IQv2? Certainly, it > seems > > > > > strange > > > > > > > only > > > > > > > > > to > > > > > > > > > > > >>>>>>>>> propose a > > > > > > > > > > > >>>>>>>>>>>> change > > > > > > > > > > > >>>>>>>>>>>>> for IQv1 and not v2. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> I will update the proposal with > > > complementary > > > > > API > > > > > > > > > changes > > > > > > > > > > > >>>>> for > > > > > > > > > > > >>>>>>> IQv2 > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> What should IQ do if I request to > > > readCommitted > > > > > on a > > > > > > > > > > > >>>>>>>>> non-transactional > > > > > > > > > > > >>>>>>>>>>>>> store? > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> We can assume that non-transactional > stores > > > > commit > > > > > > on > > > > > > > > > write, > > > > > > > > > > > >>>>> so > > > > > > > > > > > >>>>>> IQ > > > > > > > > > > > >>>>>>>>>> works > > > > > > > > > > > >>>>>>>>>>> in > > > > > > > > > > > >>>>>>>>>>>> the same way with non-transactional stores > > > > > > regardless > > > > > > > of > > > > > > > > > the > > > > > > > > > > > >>>>>> value > > > > > > > > > > > >>>>>>>>> of > > > > > > > > > > > >>>>>>>>>>>> readCommitted. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> @Guozhang, > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> * If we crash between line 3 and 4, then > at > > > that > > > > > > time > > > > > > > > the > > > > > > > > > > > >>>>> local > > > > > > > > > > > >>>>>>>>>>> persistent > > > > > > > > > > > >>>>>>>>>>>>> store image is representing as of offset > > 200, > > > > but > > > > > > > upon > > > > > > > > > > > >>>>>> recovery > > > > > > > > > > > >>>>>>>>> all > > > > > > > > > > > >>>>>>>>>>>>> changelog records from 100 to > > log-end-offset > > > > > would > > > > > > be > > > > > > > > > > > >>>>>> considered > > > > > > > > > > > >>>>>>>>> as > > > > > > > > > > > >>>>>>>>>>>> aborted > > > > > > > > > > > >>>>>>>>>>>>> and not be replayed and we would restart > > > > > processing > > > > > > > > from > > > > > > > > > > > >>>>>>> position > > > > > > > > > > > >>>>>>>>>> 100. > > > > > > > > > > > >>>>>>>>>>>>> Restart processing will violate EOS.I'm > not > > > > sure > > > > > > how > > > > > > > > e.g. > > > > > > > > > > > >>>>>>>>> RocksDB's > > > > > > > > > > > >>>>>>>>>>>>> WriteBatchWithIndex would make sure that > > the > > > > > step 4 > > > > > > > and > > > > > > > > > > > >>>>> step 5 > > > > > > > > > > > >>>>>>>>> could > > > > > > > > > > > >>>>>>>>>> be > > > > > > > > > > > >>>>>>>>>>>>> done atomically here. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Could you please point me to the place in > > the > > > > > > codebase > > > > > > > > > where > > > > > > > > > > > >>>>> a > > > > > > > > > > > >>>>>>> task > > > > > > > > > > > >>>>>>>>>>> flushes > > > > > > > > > > > >>>>>>>>>>>> the store before committing the > transaction? > > > > > > > > > > > >>>>>>>>>>>> Looking at TaskExecutor ( > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167 > > > > > > > > > > > >>>>>>>>>>>> ), > > > > > > > > > > > >>>>>>>>>>>> StreamTask#prepareCommit ( > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398 > > > > > > > > > > > >>>>>>>>>>>> ), > > > > > > > > > > > >>>>>>>>>>>> and CachedStateStore ( > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34 > > > > > > > > > > > >>>>>>>>>>>> ) > > > > > > > > > > > >>>>>>>>>>>> we flush the cache, but not the underlying > > > state > > > > > > > store. > > > > > > > > > > > >>>>> Explicit > > > > > > > > > > > >>>>>>>>>>>> StateStore#flush happens in > > > > > > > > > > > >>>>> AbstractTask#maybeWriteCheckpoint ( > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99 > > > > > > > > > > > >>>>>>>>>>>> ). > > > > > > > > > > > >>>>>>>>>>>> Is there something I am missing here? > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Today all cached data that have not been > > > flushed > > > > > are > > > > > > > not > > > > > > > > > > > >>>>>> committed > > > > > > > > > > > >>>>>>>>> for > > > > > > > > > > > >>>>>>>>>>>>> sure, but even flushed data to the > > persistent > > > > > > > > underlying > > > > > > > > > > > >>>>> store > > > > > > > > > > > >>>>>>> may > > > > > > > > > > > >>>>>>>>>> also > > > > > > > > > > > >>>>>>>>>>>> be > > > > > > > > > > > >>>>>>>>>>>>> uncommitted since flushing can be > triggered > > > > > > > > > asynchronously > > > > > > > > > > > >>>>>>> before > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>> commit. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Can you please point me to the place in > the > > > > > codebase > > > > > > > > where > > > > > > > > > > we > > > > > > > > > > > >>>>>>>>> trigger > > > > > > > > > > > >>>>>>>>>>> async > > > > > > > > > > > >>>>>>>>>>>> flush before the commit? This would > > certainly > > > > be a > > > > > > > > reason > > > > > > > > > to > > > > > > > > > > > >>>>>>>>> introduce > > > > > > > > > > > >>>>>>>>>> a > > > > > > > > > > > >>>>>>>>>>>> dedicated StateStore#commit method. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Thanks again for the feedback. I am going > to > > > > > update > > > > > > > the > > > > > > > > > KIP > > > > > > > > > > > >>>>> and > > > > > > > > > > > >>>>>>> then > > > > > > > > > > > >>>>>>>>>>>> respond to the next batch of questions and > > > > > > > suggestions. > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> Best, > > > > > > > > > > > >>>>>>>>>>>> Alex > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> On Mon, May 30, 2022 at 5:13 PM Suhas > Satish > > > > > > > > > > > >>>>>>>>>>> <ssat...@confluent.io.invalid > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> Thanks for the KIP proposal Alex. > > > > > > > > > > > >>>>>>>>>>>>> 1. Configuration default > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> You mention applications using streams > DSL > > > with > > > > > > > > built-in > > > > > > > > > > > >>>>>> rocksDB > > > > > > > > > > > >>>>>>>>>> state > > > > > > > > > > > >>>>>>>>>>>>> store will get transactional state stores > > by > > > > > > default > > > > > > > > when > > > > > > > > > > > >>>>> EOS > > > > > > > > > > > >>>>>> is > > > > > > > > > > > >>>>>>>>>>> enabled, > > > > > > > > > > > >>>>>>>>>>>>> but the default implementation for apps > > using > > > > > PAPI > > > > > > > will > > > > > > > > > > > >>>>>> fallback > > > > > > > > > > > >>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>> non-transactional behavior. > > > > > > > > > > > >>>>>>>>>>>>> Shouldn't we have the same default > behavior > > > for > > > > > > both > > > > > > > > > types > > > > > > > > > > > >>>>> of > > > > > > > > > > > >>>>>>>>> apps - > > > > > > > > > > > >>>>>>>>>>> DSL > > > > > > > > > > > >>>>>>>>>>>>> and PAPI? > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> On Mon, May 30, 2022 at 2:11 AM Bruno > > > Cadonna < > > > > > > > > > > > >>>>>>> cado...@apache.org > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the PR, Alex! > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> I am also glad to see this coming. > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> 1. Configuration > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> I would also prefer to restrict the > > > > > configuration > > > > > > of > > > > > > > > > > > >>>>>>>>> transactional > > > > > > > > > > > >>>>>>>>>> on > > > > > > > > > > > >>>>>>>>>>>>>> the state sore. Ideally, calling method > > > > > > > > transactional() > > > > > > > > > > > >>>>> on > > > > > > > > > > > >>>>>> the > > > > > > > > > > > >>>>>>>>>> state > > > > > > > > > > > >>>>>>>>>>>>>> store would be enough. An option on the > > > store > > > > > > > builder > > > > > > > > > > > >>>>> would > > > > > > > > > > > >>>>>>>>> make it > > > > > > > > > > > >>>>>>>>>>>>>> possible to turn transactionality on and > > off > > > > (as > > > > > > > John > > > > > > > > > > > >>>>>>> proposed). > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> 2. Memory usage in RocksDB > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> This seems to be a major issue. We do > not > > > have > > > > > any > > > > > > > > > > > >>>>> guarantee > > > > > > > > > > > >>>>>>>>> that > > > > > > > > > > > >>>>>>>>>>>>>> uncommitted writes fit into memory and I > > > guess > > > > > we > > > > > > > will > > > > > > > > > > > >>>>> never > > > > > > > > > > > >>>>>>>>> have. > > > > > > > > > > > >>>>>>>>>>> What > > > > > > > > > > > >>>>>>>>>>>>>> happens when the uncommitted writes do > not > > > fit > > > > > > into > > > > > > > > > > > >>>>> memory? > > > > > > > > > > > >>>>>>> Does > > > > > > > > > > > >>>>>>>>>>>> RocksDB > > > > > > > > > > > >>>>>>>>>>>>>> throw an exception? Can we handle such > an > > > > > > exception > > > > > > > > > > > >>>>> without > > > > > > > > > > > >>>>>>>>>> crashing? > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> Does the RocksDB behavior even need to > be > > > > > included > > > > > > > in > > > > > > > > > > > >>>>> this > > > > > > > > > > > >>>>>>> KIP? > > > > > > > > > > > >>>>>>>>> In > > > > > > > > > > > >>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>> end it is an implementation detail. > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> What we should consider - though - is a > > > memory > > > > > > limit > > > > > > > > in > > > > > > > > > > > >>>>> some > > > > > > > > > > > >>>>>>>>> form. > > > > > > > > > > > >>>>>>>>>>> And > > > > > > > > > > > >>>>>>>>>>>>>> what we do when the memory limit is > > > exceeded. > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> 3. PoC > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> I agree with Guozhang that a PoC is a > good > > > > idea > > > > > to > > > > > > > > > better > > > > > > > > > > > >>>>>>>>>> understand > > > > > > > > > > > >>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>> devils in the details. > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> Best, > > > > > > > > > > > >>>>>>>>>>>>>> Bruno > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> On 25.05.22 01:52, Guozhang Wang wrote: > > > > > > > > > > > >>>>>>>>>>>>>>> Hello Alex, > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for writing the proposal! Glad > to > > > see > > > > it > > > > > > > > > > > >>>>> coming. I > > > > > > > > > > > >>>>>>>>> think > > > > > > > > > > > >>>>>>>>>>> this > > > > > > > > > > > >>>>>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>> kind of a KIP that since too many > devils > > > > would > > > > > be > > > > > > > > > > > >>>>> buried > > > > > > > > > > > >>>>>> in > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>> details > > > > > > > > > > > >>>>>>>>>>>>>> and > > > > > > > > > > > >>>>>>>>>>>>>>> it's better to start working on a POC, > > > either > > > > > in > > > > > > > > > > > >>>>> parallel, > > > > > > > > > > > >>>>>>> or > > > > > > > > > > > >>>>>>>>>>> before > > > > > > > > > > > >>>>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>>>>>> resume our discussion, rather than > > blocking > > > > any > > > > > > > > > > > >>>>>>> implementation > > > > > > > > > > > >>>>>>>>>>> until > > > > > > > > > > > >>>>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>>>>> are > > > > > > > > > > > >>>>>>>>>>>>>>> satisfied with the proposal. > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> Just as a concrete example, I > personally > > am > > > > > still > > > > > > > not > > > > > > > > > > > >>>>> 100% > > > > > > > > > > > >>>>>>>>> clear > > > > > > > > > > > >>>>>>>>>>> how > > > > > > > > > > > >>>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>> proposal would work to achieve EOS with > > the > > > > > state > > > > > > > > > > > >>>>> stores. > > > > > > > > > > > >>>>>>> For > > > > > > > > > > > >>>>>>>>>>>> example, > > > > > > > > > > > >>>>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>> commit procedure today looks like this: > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> 0: there's an existing checkpoint file > > > > > indicating > > > > > > > the > > > > > > > > > > > >>>>>>>>> changelog > > > > > > > > > > > >>>>>>>>>>>> offset > > > > > > > > > > > >>>>>>>>>>>>> of > > > > > > > > > > > >>>>>>>>>>>>>>> the local state store image is 100. > Now a > > > > > commit > > > > > > is > > > > > > > > > > > >>>>>>> triggered: > > > > > > > > > > > >>>>>>>>>>>>>>> 1. flush cache (since it contains > > partially > > > > > > > processed > > > > > > > > > > > >>>>>>>>> records), > > > > > > > > > > > >>>>>>>>>>> make > > > > > > > > > > > >>>>>>>>>>>>> sure > > > > > > > > > > > >>>>>>>>>>>>>>> all records are written to the > producer. > > > > > > > > > > > >>>>>>>>>>>>>>> 2. flush producer, making sure all > > > changelog > > > > > > > records > > > > > > > > > > > >>>>> have > > > > > > > > > > > >>>>>>> now > > > > > > > > > > > >>>>>>>>>>> acked. > > > > > > > > > > > >>>>>>>>>>>> // > > > > > > > > > > > >>>>>>>>>>>>>>> here we would get the new changelog > > > position, > > > > > say > > > > > > > 200 > > > > > > > > > > > >>>>>>>>>>>>>>> 3. flush store, make sure all writes > are > > > > > > persisted. > > > > > > > > > > > >>>>>>>>>>>>>>> 4. producer.sendOffsetsToTransaction(); > > > > > > > > > > > >>>>>>>>>>> producer.commitTransaction(); > > > > > > > > > > > >>>>>>>>>>>>> // > > > > > > > > > > > >>>>>>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>>>>>> would make the writes in changelog up > to > > > > offset > > > > > > 200 > > > > > > > > > > > >>>>>>> committed > > > > > > > > > > > >>>>>>>>>>>>>>> 5. Update the checkpoint file to 200. > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> The question about atomicity between > > those > > > > > lines, > > > > > > > for > > > > > > > > > > > >>>>>>> example: > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> * If we crash between line 4 and line > 5, > > > the > > > > > > local > > > > > > > > > > > >>>>>>> checkpoint > > > > > > > > > > > >>>>>>>>>> file > > > > > > > > > > > >>>>>>>>>>>>> would > > > > > > > > > > > >>>>>>>>>>>>>>> stay as 100, and upon recovery we would > > > > replay > > > > > > the > > > > > > > > > > > >>>>>> changelog > > > > > > > > > > > >>>>>>>>> from > > > > > > > > > > > >>>>>>>>>>> 100 > > > > > > > > > > > >>>>>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>>>> 200. This is not ideal but does not > > violate > > > > > EOS, > > > > > > > > since > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>>>> changelogs > > > > > > > > > > > >>>>>>>>>>>>> are > > > > > > > > > > > >>>>>>>>>>>>>>> all overwrites anyways. > > > > > > > > > > > >>>>>>>>>>>>>>> * If we crash between line 3 and 4, > then > > at > > > > > that > > > > > > > time > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>> local > > > > > > > > > > > >>>>>>>>>>>>>> persistent > > > > > > > > > > > >>>>>>>>>>>>>>> store image is representing as of > offset > > > 200, > > > > > but > > > > > > > > upon > > > > > > > > > > > >>>>>>>>> recovery > > > > > > > > > > > >>>>>>>>>> all > > > > > > > > > > > >>>>>>>>>>>>>>> changelog records from 100 to > > > log-end-offset > > > > > > would > > > > > > > be > > > > > > > > > > > >>>>>>>>> considered > > > > > > > > > > > >>>>>>>>>> as > > > > > > > > > > > >>>>>>>>>>>>>> aborted > > > > > > > > > > > >>>>>>>>>>>>>>> and not be replayed and we would > restart > > > > > > processing > > > > > > > > > > > >>>>> from > > > > > > > > > > > >>>>>>>>> position > > > > > > > > > > > >>>>>>>>>>>> 100. > > > > > > > > > > > >>>>>>>>>>>>>>> Restart processing will violate EOS.I'm > > not > > > > > sure > > > > > > > how > > > > > > > > > > > >>>>> e.g. > > > > > > > > > > > >>>>>>>>>> RocksDB's > > > > > > > > > > > >>>>>>>>>>>>>>> WriteBatchWithIndex would make sure > that > > > the > > > > > > step 4 > > > > > > > > and > > > > > > > > > > > >>>>>>> step 5 > > > > > > > > > > > >>>>>>>>>>> could > > > > > > > > > > > >>>>>>>>>>>> be > > > > > > > > > > > >>>>>>>>>>>>>>> done atomically here. > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> Originally what I was thinking when > > > creating > > > > > the > > > > > > > JIRA > > > > > > > > > > > >>>>>> ticket > > > > > > > > > > > >>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>> that > > > > > > > > > > > >>>>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>>>>>> need to let the state store to provide > a > > > > > > > > transactional > > > > > > > > > > > >>>>> API > > > > > > > > > > > >>>>>>>>> like > > > > > > > > > > > >>>>>>>>>>>> "token > > > > > > > > > > > >>>>>>>>>>>>>>> commit()" used in step 4) above which > > > > returns a > > > > > > > > token, > > > > > > > > > > > >>>>>> that > > > > > > > > > > > >>>>>>>>> e.g. > > > > > > > > > > > >>>>>>>>>> in > > > > > > > > > > > >>>>>>>>>>>> our > > > > > > > > > > > >>>>>>>>>>>>>>> example above indicates offset 200, and > > > that > > > > > > token > > > > > > > > > > > >>>>> would > > > > > > > > > > > >>>>>> be > > > > > > > > > > > >>>>>>>>>> written > > > > > > > > > > > >>>>>>>>>>>> as > > > > > > > > > > > >>>>>>>>>>>>>> part > > > > > > > > > > > >>>>>>>>>>>>>>> of the records in Kafka transaction in > > step > > > > 5). > > > > > > And > > > > > > > > > > > >>>>> upon > > > > > > > > > > > >>>>>>>>> recovery > > > > > > > > > > > >>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>> state > > > > > > > > > > > >>>>>>>>>>>>>>> store would have another API like > > > > > > "rollback(token)" > > > > > > > > > > > >>>>> where > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > >>>>>>>>>> token > > > > > > > > > > > >>>>>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>>>>> read > > > > > > > > > > > >>>>>>>>>>>>>>> from the latest committed txn, and be > > used > > > to > > > > > > > > rollback > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>> store > > > > > > > > > > > >>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>> that > > > > > > > > > > > >>>>>>>>>>>>>>> committed image. I think your proposal > is > > > > > > > different, > > > > > > > > > > > >>>>> and > > > > > > > > > > > >>>>>> it > > > > > > > > > > > >>>>>>>>> seems > > > > > > > > > > > >>>>>>>>>>>> like > > > > > > > > > > > >>>>>>>>>>>>>>> you're proposing we swap step 3) and 4) > > > > above, > > > > > > but > > > > > > > > the > > > > > > > > > > > >>>>>>>>> atomicity > > > > > > > > > > > >>>>>>>>>>>> issue > > > > > > > > > > > >>>>>>>>>>>>>>> still remains since now you may have > the > > > > store > > > > > > > image > > > > > > > > at > > > > > > > > > > > >>>>>> 100 > > > > > > > > > > > >>>>>>>>> but > > > > > > > > > > > >>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>> changelog is committed at 200. I'd like > > to > > > > > learn > > > > > > > more > > > > > > > > > > > >>>>>> about > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>> details > > > > > > > > > > > >>>>>>>>>>>>>>> on how it resolves such issues. > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> Anyways, that's just an example to make > > the > > > > > point > > > > > > > > that > > > > > > > > > > > >>>>>> there > > > > > > > > > > > >>>>>>>>> are > > > > > > > > > > > >>>>>>>>>>> lots > > > > > > > > > > > >>>>>>>>>>>>> of > > > > > > > > > > > >>>>>>>>>>>>>>> implementational details which would > > drive > > > > the > > > > > > > public > > > > > > > > > > > >>>>> API > > > > > > > > > > > >>>>>>>>> design, > > > > > > > > > > > >>>>>>>>>>> and > > > > > > > > > > > >>>>>>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>>>>>> should probably first do a POC, and > come > > > back > > > > > to > > > > > > > > > > > >>>>> discuss > > > > > > > > > > > >>>>>> the > > > > > > > > > > > >>>>>>>>> KIP. > > > > > > > > > > > >>>>>>>>>>> Let > > > > > > > > > > > >>>>>>>>>>>>> me > > > > > > > > > > > >>>>>>>>>>>>>>> know what you think? > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> Guozhang > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 AM Sagar > < > > > > > > > > > > > >>>>>>>>>> sagarmeansoc...@gmail.com> > > > > > > > > > > > >>>>>>>>>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Hi Alexander, > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for the KIP! This seems like a > > > great > > > > > > > > proposal. > > > > > > > > > > > >>>>> I > > > > > > > > > > > >>>>>>> have > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>> same > > > > > > > > > > > >>>>>>>>>>>>>>>> opinion as John on the Configuration > > part > > > > > > though. > > > > > > > I > > > > > > > > > > > >>>>> think > > > > > > > > > > > >>>>>>>>> the 2 > > > > > > > > > > > >>>>>>>>>>>> level > > > > > > > > > > > >>>>>>>>>>>>>>>> config and its behaviour based on the > > > > > > > > > > > >>>>> setting/unsetting > > > > > > > > > > > >>>>>> of > > > > > > > > > > > >>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>> flag > > > > > > > > > > > >>>>>>>>>>>>>> seems > > > > > > > > > > > >>>>>>>>>>>>>>>> confusing to me as well. Since the KIP > > > seems > > > > > > > > > > > >>>>> specifically > > > > > > > > > > > >>>>>>>>>> centred > > > > > > > > > > > >>>>>>>>>>>>> around > > > > > > > > > > > >>>>>>>>>>>>>>>> RocksDB it might be better to add it > at > > > the > > > > > > > Supplier > > > > > > > > > > > >>>>>> level > > > > > > > > > > > >>>>>>> as > > > > > > > > > > > >>>>>>>>>> John > > > > > > > > > > > >>>>>>>>>>>>>>>> suggested. > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> On similar lines, this config name => > > > > > > > > > > > >>>>>>>>>>>>>> *statestore.transactional.mechanism > > > > > > > > > > > >>>>>>>>>>>>>>>> *may > > > > > > > > > > > >>>>>>>>>>>>>>>> also need rethinking as the value > > assigned > > > > to > > > > > > > > > > > >>>>>>>>>>> it(rocksdb_indexbatch) > > > > > > > > > > > >>>>>>>>>>>>>>>> implicitly seems to assume that > rocksdb > > is > > > > the > > > > > > > only > > > > > > > > > > > >>>>>>>>> statestore > > > > > > > > > > > >>>>>>>>>>> that > > > > > > > > > > > >>>>>>>>>>>>>> Kafka > > > > > > > > > > > >>>>>>>>>>>>>>>> Stream supports while that's not the > > case. > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Also, regarding the potential memory > > > > pressure > > > > > > that > > > > > > > > > > > >>>>> can be > > > > > > > > > > > >>>>>>>>>>> introduced > > > > > > > > > > > >>>>>>>>>>>>> by > > > > > > > > > > > >>>>>>>>>>>>>>>> WriteBatchIndex, do you think it might > > > make > > > > > more > > > > > > > > > > > >>>>> sense to > > > > > > > > > > > >>>>>>>>>> include > > > > > > > > > > > >>>>>>>>>>>> some > > > > > > > > > > > >>>>>>>>>>>>>>>> numbers/benchmarks on how much the > > memory > > > > > > > > consumption > > > > > > > > > > > >>>>>> might > > > > > > > > > > > >>>>>>>>>>>> increase? > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Lastly, the read_uncommitted flag's > > > > behaviour > > > > > on > > > > > > > IQ > > > > > > > > > > > >>>>> may > > > > > > > > > > > >>>>>>> need > > > > > > > > > > > >>>>>>>>>> more > > > > > > > > > > > >>>>>>>>>>>>>>>> elaboration. > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> These points aside, as I said, this > is a > > > > great > > > > > > > > > > > >>>>> proposal! > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks! > > > > > > > > > > > >>>>>>>>>>>>>>>> Sagar. > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 PM John > > > > Roesler > > > > > < > > > > > > > > > > > >>>>>>>>>>> vvcep...@apache.org> > > > > > > > > > > > >>>>>>>>>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks for the KIP, Alex! > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> I'm really happy to see your > proposal. > > > This > > > > > > > > > > > >>>>> improvement > > > > > > > > > > > >>>>>>>>> fills a > > > > > > > > > > > >>>>>>>>>>>>>>>>> long-standing gap. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> I have a few questions: > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> 1. Configuration > > > > > > > > > > > >>>>>>>>>>>>>>>>> The KIP only mentions RocksDB, but of > > > > course, > > > > > > > > Streams > > > > > > > > > > > >>>>>> also > > > > > > > > > > > >>>>>>>>>> ships > > > > > > > > > > > >>>>>>>>>>>> with > > > > > > > > > > > >>>>>>>>>>>>>> an > > > > > > > > > > > >>>>>>>>>>>>>>>>> InMemory store, and users also plug > in > > > > their > > > > > > own > > > > > > > > > > > >>>>> custom > > > > > > > > > > > >>>>>>>>> state > > > > > > > > > > > >>>>>>>>>>>> stores. > > > > > > > > > > > >>>>>>>>>>>>>> It > > > > > > > > > > > >>>>>>>>>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>>>>>>>> also common to use multiple types of > > > state > > > > > > stores > > > > > > > > in > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>> same > > > > > > > > > > > >>>>>>>>>>>>>> application > > > > > > > > > > > >>>>>>>>>>>>>>>>> for different purposes. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Against this backdrop, the choice to > > > > > configure > > > > > > > > > > > >>>>>>>>> transactionality > > > > > > > > > > > >>>>>>>>>>> as > > > > > > > > > > > >>>>>>>>>>>> a > > > > > > > > > > > >>>>>>>>>>>>>>>>> top-level config, as well as to > > configure > > > > the > > > > > > > store > > > > > > > > > > > >>>>>>>>> transaction > > > > > > > > > > > >>>>>>>>>>>>>> mechanism > > > > > > > > > > > >>>>>>>>>>>>>>>>> as a top-level config, seems a bit > off. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Did you consider instead just adding > > the > > > > > option > > > > > > > to > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>>>> RocksDB*StoreSupplier classes and the > > > > > factories > > > > > > > in > > > > > > > > > > > >>>>>> Stores > > > > > > > > > > > >>>>>>> ? > > > > > > > > > > > >>>>>>>>> It > > > > > > > > > > > >>>>>>>>>>>> seems > > > > > > > > > > > >>>>>>>>>>>>>> like > > > > > > > > > > > >>>>>>>>>>>>>>>>> the desire to enable the feature by > > > > default, > > > > > > but > > > > > > > > > > > >>>>> with a > > > > > > > > > > > >>>>>>>>>>>> feature-flag > > > > > > > > > > > >>>>>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>>>>>> disable it was a factor here. > However, > > as > > > > you > > > > > > > > pointed > > > > > > > > > > > >>>>>> out, > > > > > > > > > > > >>>>>>>>>> there > > > > > > > > > > > >>>>>>>>>>>> are > > > > > > > > > > > >>>>>>>>>>>>>> some > > > > > > > > > > > >>>>>>>>>>>>>>>>> major considerations that users > should > > be > > > > > aware > > > > > > > of, > > > > > > > > > > > >>>>> so > > > > > > > > > > > >>>>>>>>> opt-in > > > > > > > > > > > >>>>>>>>>>>> doesn't > > > > > > > > > > > >>>>>>>>>>>>>>>> seem > > > > > > > > > > > >>>>>>>>>>>>>>>>> like a bad choice, either. You could > > add > > > an > > > > > > Enum > > > > > > > > > > > >>>>>> argument > > > > > > > > > > > >>>>>>> to > > > > > > > > > > > >>>>>>>>>>> those > > > > > > > > > > > >>>>>>>>>>>>>>>>> factories like > > > > > > > > `RocksDBTransactionalMechanism.{NONE, > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Some points in favor of this > approach: > > > > > > > > > > > >>>>>>>>>>>>>>>>> * Avoid "stores that don't support > > > > > transactions > > > > > > > > > > > >>>>> ignore > > > > > > > > > > > >>>>>> the > > > > > > > > > > > >>>>>>>>>>> config" > > > > > > > > > > > >>>>>>>>>>>>>>>>> complexity > > > > > > > > > > > >>>>>>>>>>>>>>>>> * Users can choose how to spend their > > > > memory > > > > > > > > budget, > > > > > > > > > > > >>>>>>> making > > > > > > > > > > > >>>>>>>>>> some > > > > > > > > > > > >>>>>>>>>>>>> stores > > > > > > > > > > > >>>>>>>>>>>>>>>>> transactional and others not > > > > > > > > > > > >>>>>>>>>>>>>>>>> * When we add transactional support > to > > > > > > in-memory > > > > > > > > > > > >>>>> stores, > > > > > > > > > > > >>>>>>> we > > > > > > > > > > > >>>>>>>>>> don't > > > > > > > > > > > >>>>>>>>>>>>> have > > > > > > > > > > > >>>>>>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>>>>>> figure out what to do with the > > mechanism > > > > > config > > > > > > > > > > > >>>>> (i.e., > > > > > > > > > > > >>>>>>> what > > > > > > > > > > > >>>>>>>>> do > > > > > > > > > > > >>>>>>>>>>> you > > > > > > > > > > > >>>>>>>>>>>>> set > > > > > > > > > > > >>>>>>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>>>> mechanism to when there are multiple > > > kinds > > > > of > > > > > > > > > > > >>>>>>> transactional > > > > > > > > > > > >>>>>>>>>>> stores > > > > > > > > > > > >>>>>>>>>>>> in > > > > > > > > > > > >>>>>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>>>> topology?) > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> 2. caching/flushing/transactions > > > > > > > > > > > >>>>>>>>>>>>>>>>> The coupling between memory usage and > > > > > flushing > > > > > > > that > > > > > > > > > > > >>>>> you > > > > > > > > > > > >>>>>>>>>> mentioned > > > > > > > > > > > >>>>>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>>>> a > > > > > > > > > > > >>>>>>>>>>>>>>>> bit > > > > > > > > > > > >>>>>>>>>>>>>>>>> troubling. It also occurs to me that > > > there > > > > > > seems > > > > > > > to > > > > > > > > > > > >>>>> be > > > > > > > > > > > >>>>>>> some > > > > > > > > > > > >>>>>>>>>>>>>> relationship > > > > > > > > > > > >>>>>>>>>>>>>>>>> with the existing record cache, which > > is > > > > also > > > > > > an > > > > > > > > > > > >>>>>> in-memory > > > > > > > > > > > >>>>>>>>>>> holding > > > > > > > > > > > >>>>>>>>>>>>> area > > > > > > > > > > > >>>>>>>>>>>>>>>> for > > > > > > > > > > > >>>>>>>>>>>>>>>>> records that are not yet written to > the > > > > cache > > > > > > > > and/or > > > > > > > > > > > >>>>>> store > > > > > > > > > > > >>>>>>>>>>> (albeit > > > > > > > > > > > >>>>>>>>>>>>> with > > > > > > > > > > > >>>>>>>>>>>>>>>> no > > > > > > > > > > > >>>>>>>>>>>>>>>>> particular semantics). Have you > > > considered > > > > > how > > > > > > > all > > > > > > > > > > > >>>>> these > > > > > > > > > > > >>>>>>>>>>> components > > > > > > > > > > > >>>>>>>>>>>>>>>> should > > > > > > > > > > > >>>>>>>>>>>>>>>>> relate? For example, should a "full" > > > > > WriteBatch > > > > > > > > > > > >>>>> actually > > > > > > > > > > > >>>>>>>>>> trigger > > > > > > > > > > > >>>>>>>>>>> a > > > > > > > > > > > >>>>>>>>>>>>>> flush > > > > > > > > > > > >>>>>>>>>>>>>>>> so > > > > > > > > > > > >>>>>>>>>>>>>>>>> that we don't get OOMEs? If the > > proposed > > > > > > > > > > > >>>>> transactional > > > > > > > > > > > >>>>>>>>>> mechanism > > > > > > > > > > > >>>>>>>>>>>>> forces > > > > > > > > > > > >>>>>>>>>>>>>>>> all > > > > > > > > > > > >>>>>>>>>>>>>>>>> uncommitted writes to be buffered in > > > > memory, > > > > > > > until > > > > > > > > a > > > > > > > > > > > >>>>>>> commit, > > > > > > > > > > > >>>>>>>>>> then > > > > > > > > > > > >>>>>>>>>>>>> what > > > > > > > > > > > >>>>>>>>>>>>>> is > > > > > > > > > > > >>>>>>>>>>>>>>>>> the advantage over just doing the > same > > > > thing > > > > > > with > > > > > > > > the > > > > > > > > > > > >>>>>>>>>> RecordCache > > > > > > > > > > > >>>>>>>>>>>> and > > > > > > > > > > > >>>>>>>>>>>>>> not > > > > > > > > > > > >>>>>>>>>>>>>>>>> introducing the WriteBatch at all? > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> 3. ALOS > > > > > > > > > > > >>>>>>>>>>>>>>>>> You mentioned that a transactional > > store > > > > can > > > > > > help > > > > > > > > > > > >>>>> reduce > > > > > > > > > > > >>>>>>>>>>>> duplication > > > > > > > > > > > >>>>>>>>>>>>> in > > > > > > > > > > > >>>>>>>>>>>>>>>>> the case of ALOS. We might want to be > > > > careful > > > > > > > about > > > > > > > > > > > >>>>>> claims > > > > > > > > > > > >>>>>>>>> like > > > > > > > > > > > >>>>>>>>>>>> that. > > > > > > > > > > > >>>>>>>>>>>>>>>>> Duplication isn't the way that > repeated > > > > > > > processing > > > > > > > > > > > >>>>>>>>> manifests in > > > > > > > > > > > >>>>>>>>>>>> state > > > > > > > > > > > >>>>>>>>>>>>>>>>> stores. Rather, it is in the form of > > > dirty > > > > > > reads > > > > > > > > > > > >>>>> during > > > > > > > > > > > >>>>>>>>>>>> reprocessing. > > > > > > > > > > > >>>>>>>>>>>>>>>> This > > > > > > > > > > > >>>>>>>>>>>>>>>>> feature may reduce the incidence of > > dirty > > > > > reads > > > > > > > > > > > >>>>> during > > > > > > > > > > > >>>>>>>>>>>> reprocessing, > > > > > > > > > > > >>>>>>>>>>>>>> but > > > > > > > > > > > >>>>>>>>>>>>>>>>> not in a predictable way. During > > regular > > > > > > > processing > > > > > > > > > > > >>>>>> today, > > > > > > > > > > > >>>>>>>>> we > > > > > > > > > > > >>>>>>>>>>> will > > > > > > > > > > > >>>>>>>>>>>>> send > > > > > > > > > > > >>>>>>>>>>>>>>>>> some records through to the changelog > > in > > > > > > between > > > > > > > > > > > >>>>> commit > > > > > > > > > > > >>>>>>>>>>> intervals. > > > > > > > > > > > >>>>>>>>>>>>>> Under > > > > > > > > > > > >>>>>>>>>>>>>>>>> ALOS, if any of those dirty writes > gets > > > > > > committed > > > > > > > > to > > > > > > > > > > > >>>>> the > > > > > > > > > > > >>>>>>>>>>> changelog > > > > > > > > > > > >>>>>>>>>>>>>> topic, > > > > > > > > > > > >>>>>>>>>>>>>>>>> then upon failure, we have to roll > the > > > > store > > > > > > > > forward > > > > > > > > > > > >>>>> to > > > > > > > > > > > >>>>>>> them > > > > > > > > > > > >>>>>>>>>>>> anyway, > > > > > > > > > > > >>>>>>>>>>>>>>>>> regardless of this new transactional > > > > > mechanism. > > > > > > > > > > > >>>>> That's a > > > > > > > > > > > >>>>>>>>>> fixable > > > > > > > > > > > >>>>>>>>>>>>>> problem, > > > > > > > > > > > >>>>>>>>>>>>>>>>> by the way, but this KIP doesn't seem > > to > > > > fix > > > > > > it. > > > > > > > I > > > > > > > > > > > >>>>>> wonder > > > > > > > > > > > >>>>>>>>> if we > > > > > > > > > > > >>>>>>>>>>>>> should > > > > > > > > > > > >>>>>>>>>>>>>>>> make > > > > > > > > > > > >>>>>>>>>>>>>>>>> any claims about the relationship of > > this > > > > > > feature > > > > > > > > to > > > > > > > > > > > >>>>>> ALOS > > > > > > > > > > > >>>>>>> if > > > > > > > > > > > >>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>>> real-world > > > > > > > > > > > >>>>>>>>>>>>>>>>> behavior is so complex. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> 4. IQ > > > > > > > > > > > >>>>>>>>>>>>>>>>> As a reminder, we have a new IQv2 > > > mechanism > > > > > > now. > > > > > > > > > > > >>>>> Should > > > > > > > > > > > >>>>>> we > > > > > > > > > > > >>>>>>>>>>> propose > > > > > > > > > > > >>>>>>>>>>>>> any > > > > > > > > > > > >>>>>>>>>>>>>>>>> changes to IQv1 to support this > > > > transactional > > > > > > > > > > > >>>>> mechanism, > > > > > > > > > > > >>>>>>>>> versus > > > > > > > > > > > >>>>>>>>>>>> just > > > > > > > > > > > >>>>>>>>>>>>>>>>> proposing it for IQv2? Certainly, it > > > seems > > > > > > > strange > > > > > > > > > > > >>>>> only > > > > > > > > > > > >>>>>> to > > > > > > > > > > > >>>>>>>>>>> propose > > > > > > > > > > > >>>>>>>>>>>> a > > > > > > > > > > > >>>>>>>>>>>>>>>> change > > > > > > > > > > > >>>>>>>>>>>>>>>>> for IQv1 and not v2. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Regarding your proposal for IQv1, I'm > > > > unsure > > > > > > what > > > > > > > > the > > > > > > > > > > > >>>>>>>>> behavior > > > > > > > > > > > >>>>>>>>>>>> should > > > > > > > > > > > >>>>>>>>>>>>>> be > > > > > > > > > > > >>>>>>>>>>>>>>>>> for readCommitted, since the current > > > > behavior > > > > > > > also > > > > > > > > > > > >>>>> reads > > > > > > > > > > > >>>>>>>>> out of > > > > > > > > > > > >>>>>>>>>>> the > > > > > > > > > > > >>>>>>>>>>>>>>>>> RecordCache. I guess if > > > > readCommitted==false, > > > > > > > then > > > > > > > > we > > > > > > > > > > > >>>>>> will > > > > > > > > > > > >>>>>>>>>>> continue > > > > > > > > > > > >>>>>>>>>>>>> to > > > > > > > > > > > >>>>>>>>>>>>>>>> read > > > > > > > > > > > >>>>>>>>>>>>>>>>> from the cache first, then the Batch, > > > then > > > > > the > > > > > > > > store; > > > > > > > > > > > >>>>>> and > > > > > > > > > > > >>>>>>> if > > > > > > > > > > > >>>>>>>>>>>>>>>>> readCommitted==true, we would skip > the > > > > cache > > > > > > and > > > > > > > > the > > > > > > > > > > > >>>>>> Batch > > > > > > > > > > > >>>>>>>>> and > > > > > > > > > > > >>>>>>>>>>> only > > > > > > > > > > > >>>>>>>>>>>>>> read > > > > > > > > > > > >>>>>>>>>>>>>>>>> from the persistent RocksDB store? > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> What should IQ do if I request to > > > > > readCommitted > > > > > > > on > > > > > > > > a > > > > > > > > > > > >>>>>>>>>>>>> non-transactional > > > > > > > > > > > >>>>>>>>>>>>>>>>> store? > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks again for proposing the KIP, > and > > > my > > > > > > > > apologies > > > > > > > > > > > >>>>> for > > > > > > > > > > > >>>>>>> the > > > > > > > > > > > >>>>>>>>>> long > > > > > > > > > > > >>>>>>>>>>>>>> reply; > > > > > > > > > > > >>>>>>>>>>>>>>>>> I'm hoping to air all my concerns in > > one > > > > > > "batch" > > > > > > > to > > > > > > > > > > > >>>>> save > > > > > > > > > > > >>>>>>>>> time > > > > > > > > > > > >>>>>>>>>> for > > > > > > > > > > > >>>>>>>>>>>>> you. > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks, > > > > > > > > > > > >>>>>>>>>>>>>>>>> -John > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> On Tue, May 24, 2022, at 03:45, > > Alexander > > > > > > > > Sorokoumov > > > > > > > > > > > >>>>>>> wrote: > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Hi all, > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>>> I've written a KIP for making Kafka > > > > Streams > > > > > > > state > > > > > > > > > > > >>>>>> stores > > > > > > > > > > > >>>>>>>>>>>>> transactional > > > > > > > > > > > >>>>>>>>>>>>>>>>> and > > > > > > > > > > > >>>>>>>>>>>>>>>>>> would like to start a discussion: > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > > > > > > > > > > > >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Best, > > > > > > > > > > > >>>>>>>>>>>>>>>>>> Alex > > > > > > > > > > > >>>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> -- > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> [image: Confluent] < > > https://www.confluent.io > > > > > > > > > > > > > > > >>>>>>>>>>>>> Suhas Satish > > > > > > > > > > > >>>>>>>>>>>>> Engineering Manager > > > > > > > > > > > >>>>>>>>>>>>> Follow us: [image: Blog] > > > > > > > > > > > >>>>>>>>>>>>> < > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog > > > > > > > > > > > >>>>>>>>>>>>>> [image: > > > > > > > > > > > >>>>>>>>>>>>> Twitter] < > https://twitter.com/ConfluentInc > > > > > >[image: > > > > > > > > > > > >>>>> LinkedIn] > > > > > > > > > > > >>>>>>>>>>>>> < > > https://www.linkedin.com/company/confluent/ > > > > > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> [image: Try Confluent Cloud for Free] > > > > > > > > > > > >>>>>>>>>>>>> < > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic > > > > > > > > > > > >>>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>>> -- > > > > > > > > > > > >>>>>>>>>>> -- Guozhang > > > > > > > > > > > >>>>>>>>>>> > > > > > > > > > > > >>>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>>> -- > > > > > > > > > > > >>>>>>>>> -- Guozhang > > > > > > > > > > > >>>>>>>>> > > > > > > > > > > > >>>>>>>> > > > > > > > > > > > >>>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>>> -- > > > > > > > > > > > >>>>>> -- Guozhang > > > > > > > > > > > >>>>>> > > > > > > > > > > > >>>>> > > > > > > > > > > > >>>> > > > > > > > > > > > >>> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >