Hey Nick,

Thank you for the prototype testing and benchmarking, and sorry for the
late reply!

I agree that it is worth revisiting the WriteBatchWithIndex approach. I
will implement a fork of the current prototype that uses that mechanism to
ensure transactionality and let you know when it is ready for
review/testing in this ML thread.

As for time estimates, I might not have enough time to finish the prototype
in December, so it will probably be ready for review in January.

Best,
Alex

On Fri, Nov 11, 2022 at 4:24 PM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> Sorry to dredge this up again. I've had a chance to start doing some
> testing with the WIP Pull Request, and it appears as though the secondary
> store solution performs rather poorly.
>
> In our testing, we had a non-transactional state store that would restore
> (from scratch), at a rate of nearly 1,000,000 records/second. When we
> switched it to a transactional store, it restored at a rate of less than
> 40,000 records/second.
>
> I suspect the key issues here are having to copy the data out of the
> temporary store and into the main store on-commit, and to a lesser extent,
> the extra memory copies during writes.
>
> I think it's worth re-visiting the WriteBatchWithIndex solution, as it's
> clear from the RocksDB post[1] on the subject that it's the recommended way
> to achieve transactionality.
>
> The only issue you identified with this solution was that uncommitted
> writes are required to entirely fit in-memory, and RocksDB recommends they
> don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I
> think we'll find that this will be a non-issue for all but the most extreme
> cases, and for those, I think I have a fairly simple solution.
>
> Firstly, when EOS is enabled, the default commit.interval.ms is set to
> 100ms, which provides fairly short intervals that uncommitted writes need
> to be buffered in-memory. If we assume a worst case of 1024 byte records
> (and for most cases, they should be much smaller), then 4MiB would hold
> ~4096 records, which with 100ms commit intervals is a throughput of
> approximately 40,960 records/second. This seems quite reasonable.
>
> For use cases that wouldn't reasonably fit in-memory, my suggestion is that
> we have a mechanism that tracks the number/size of uncommitted records in
> stores, and prematurely commits the Task when this size exceeds a
> configured threshold.
>
> Thanks for your time, and let me know what you think!
> --
> Nick
>
> 1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html
>
> On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov
> <asorokou...@confluent.io.invalid> wrote:
>
> > Hey Nick,
> >
> > It is going to be option c. Existing state is considered to be committed
> > and there will be an additional RocksDB for uncommitted writes.
> >
> > I am out of office until October 24. I will update KIP and make sure that
> > we have an upgrade test for that after coming back from vacation.
> >
> > Best,
> > Alex
> >
> > On Thu, Oct 6, 2022 at 5:06 PM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I realise this has already been voted on and accepted, but it occurred
> to
> > > me today that the KIP doesn't define the migration/upgrade path for
> > > existing non-transactional StateStores that *become* transactional,
> i.e.
> > by
> > > adding the transactional boolean to the StateStore constructor.
> > >
> > > What would be the result, when such a change is made to a Topology,
> > without
> > > explicitly wiping the application state?
> > > a) An error.
> > > b) Local state is wiped.
> > > c) Existing RocksDB database is used as committed writes and new
> RocksDB
> > > database is created for uncommitted writes.
> > > d) Something else?
> > >
> > > Regards,
> > >
> > > Nick
> > >
> > > On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov
> > > <asorokou...@confluent.io.invalid> wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > Sounds good. I annotated all added StateStore methods (commit,
> recover,
> > > > transactional) with @Evolving.
> > > >
> > > > Best,
> > > > Alex
> > > >
> > > >
> > > >
> > > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Alex,
> > > > >
> > > > > Thanks for the detailed replies, I think that makes sense, and in
> the
> > > > long
> > > > > run we would need some public indicators from StateStore to
> determine
> > > if
> > > > > checkpoints can really be used to indicate clean snapshots.
> > > > >
> > > > > As for the @Evolving label, I think we can still keep it but for a
> > > > > different reason, since as we add more state management
> > functionalities
> > > > in
> > > > > the near future we may need to revisit the public APIs again and
> > hence
> > > > > keeping it as @Evolving would allow us to modify if necessary, in
> an
> > > > easier
> > > > > path than deprecate -> delete after several minor releases.
> > > > >
> > > > > Besides that, I have no further comments about the KIP.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Fri, Aug 26, 2022 at 1:51 AM Alexander Sorokoumov
> > > > > <asorokou...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hey Guozhang,
> > > > > >
> > > > > >
> > > > > > I think that we will have to keep StateStore#transactional()
> > because
> > > > > > post-commit checkpointing of non-txn state stores will break the
> > > > > guarantees
> > > > > > we want in
> > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint
> > > > for
> > > > > > correct recovery. Let's consider checkpoint-recovery behavior
> under
> > > EOS
> > > > > > that we want to support:
> > > > > >
> > > > > > 1. Non-txn state stores should checkpoint on graceful shutdown
> and
> > > > > restore
> > > > > > from that checkpoint.
> > > > > >
> > > > > > 2. Non-txn state stores should delete local data during recovery
> > > after
> > > > a
> > > > > > crash failure.
> > > > > >
> > > > > > 3. Txn state stores should checkpoint on commit and on graceful
> > > > shutdown.
> > > > > > These stores should roll back uncommitted changes instead of
> > deleting
> > > > all
> > > > > > local data.
> > > > > >
> > > > > >
> > > > > > #1 and #2 are already supported; this proposal adds #3.
> > Essentially,
> > > we
> > > > > > have two parties at play here - the post-commit checkpointing in
> > > > > > StreamTask#postCommit and recovery in ProcessorStateManager#
> > > > > > initializeStoreOffsetsFromCheckpoint. Together, these methods
> must
> > > > allow
> > > > > > all three workflows and prevent invalid behavior, e.g., non-txn
> > > stores
> > > > > > should not checkpoint post-commit to avoid keeping uncommitted
> data
> > > on
> > > > > > recovery.
> > > > > >
> > > > > >
> > > > > > In the current state of the prototype, we checkpoint only txn
> state
> > > > > stores
> > > > > > post-commit under EOS using StateStore#transactional(). If we
> > remove
> > > > > > StateStore#transactional() and always checkpoint post-commit,
> > > > > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint will
> > have
> > > to
> > > > > > determine whether to delete local data. Non-txn implementation of
> > > > > > StateStore#recover can't detect if it has uncommitted writes.
> Since
> > > its
> > > > > > default implementation must always return either true or false,
> > > > signaling
> > > > > > whether it is restored into a valid committed-only state. If
> > > > > > StateStore#recover always returns true, we preserve uncommitted
> > > writes
> > > > > and
> > > > > > violate correctness. Otherwise, ProcessorStateManager#
> > > > > > initializeStoreOffsetsFromCheckpoint would always delete local
> data
> > > > even
> > > > > > after
> > > > > > a graceful shutdown.
> > > > > >
> > > > > >
> > > > > > With StateStore#transactional we avoid checkpointing non-txn
> state
> > > > stores
> > > > > > and prevent that problem during recovery.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Fri, Aug 19, 2022 at 1:05 AM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Alex,
> > > > > > >
> > > > > > > Thanks for the replies!
> > > > > > >
> > > > > > > > As long as we allow custom user implementations of that
> > > interface,
> > > > we
> > > > > > > should
> > > > > > > probably either keep that flag to distinguish between
> > transactional
> > > > and
> > > > > > > non-transactional implementations or change the contract behind
> > the
> > > > > > > interface. What do you think?
> > > > > > >
> > > > > > > Regarding this question, I thought that in the long run, we may
> > > > always
> > > > > > > write checkpoints regardless of txn v.s. non-txn stores, in
> which
> > > > case
> > > > > we
> > > > > > > would not need that `StateStore#transactional()`. But for now
> in
> > > > order
> > > > > > for
> > > > > > > backward compatibility edge cases we still need to distinguish
> on
> > > > > whether
> > > > > > > or not to write checkpoints. Maybe I was mis-reading its
> > purposes?
> > > If
> > > > > > yes,
> > > > > > > please let me know.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Aug 15, 2022 at 7:56 AM Alexander Sorokoumov
> > > > > > > <asorokou...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > > > Hey Guozhang,
> > > > > > > >
> > > > > > > > Thank you for elaborating! I like your idea to introduce a
> > > > > > StreamsConfig
> > > > > > > > specifically for the default store APIs. You mentioned
> > > > Materialized,
> > > > > > but
> > > > > > > I
> > > > > > > > think changes in StreamJoined follow the same logic.
> > > > > > > >
> > > > > > > > I updated the KIP and the prototype according to your
> > > suggestions:
> > > > > > > > * Add a new StoreType and a StreamsConfig for transactional
> > > > RocksDB.
> > > > > > > > * Decide whether Materialized/StreamJoined are transactional
> > > based
> > > > on
> > > > > > the
> > > > > > > > configured StoreType.
> > > > > > > > * Move RocksDBTransactionalMechanism to
> > > > > > > > org.apache.kafka.streams.state.internals to remove it from
> the
> > > > > proposal
> > > > > > > > scope.
> > > > > > > > * Add a flag in new Stores methods to configure a state store
> > as
> > > > > > > > transactional. Transactional state stores use the default
> > > > > transactional
> > > > > > > > mechanism.
> > > > > > > > * The changes above allowed to remove all changes to the
> > > > > StoreSupplier
> > > > > > > > interface.
> > > > > > > >
> > > > > > > > I am not sure about marking StateStore#transactional() as
> > > evolving.
> > > > > As
> > > > > > > long
> > > > > > > > as we allow custom user implementations of that interface, we
> > > > should
> > > > > > > > probably either keep that flag to distinguish between
> > > transactional
> > > > > and
> > > > > > > > non-transactional implementations or change the contract
> behind
> > > the
> > > > > > > > interface. What do you think?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Alex
> > > > > > > >
> > > > > > > > On Thu, Aug 11, 2022 at 1:00 AM Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello Alex,
> > > > > > > > >
> > > > > > > > > Thanks for the replies. Regarding the global config v.s.
> > > > per-store
> > > > > > > spec,
> > > > > > > > I
> > > > > > > > > agree with John's early comments to some degrees, but I
> think
> > > we
> > > > > may
> > > > > > > well
> > > > > > > > > distinguish a couple scenarios here. In sum we are
> discussing
> > > > about
> > > > > > the
> > > > > > > > > following levels of per-store spec:
> > > > > > > > >
> > > > > > > > > * Materialized#transactional()
> > > > > > > > > * StoreSupplier#transactional()
> > > > > > > > > * StateStore#transactional()
> > > > > > > > > * Stores.persistentTransactionalKeyValueStore()...
> > > > > > > > >
> > > > > > > > > And my thoughts are the following:
> > > > > > > > >
> > > > > > > > > * In the current proposal users could specify transactional
> > as
> > > > > either
> > > > > > > > > "Materialized.as("storeName").withTransantionsEnabled()" or
> > > > > > > > >
> > > > "Materialized.as(Stores.persistentTransactionalKeyValueStore(..))",
> > > > > > > which
> > > > > > > > > seems not necessary to me. In general, the more options the
> > > > library
> > > > > > > > > provides, the messier for users to learn the new APIs.
> > > > > > > > >
> > > > > > > > > * When using built-in stores, users would usually go with
> > > > > > > > > Materialized.as("storeName"). In such cases I feel it's not
> > > very
> > > > > > > > meaningful
> > > > > > > > > to specify "some of the built-in stores to be
> transactional,
> > > > while
> > > > > > > others
> > > > > > > > > be non transactional": as long as one of your stores are
> > > > > > > > non-transactional,
> > > > > > > > > you'd still pay for large restoration cost upon unclean
> > > failure.
> > > > > > People
> > > > > > > > > may, indeed, want to specify if different transactional
> > > > mechanisms
> > > > > to
> > > > > > > be
> > > > > > > > > used across stores; but for whether or not the stores
> should
> > be
> > > > > > > > > transactional, I feel it's really an "all or none" answer,
> > and
> > > > our
> > > > > > > > built-in
> > > > > > > > > form (rocksDB) should support transactionality for all
> store
> > > > types.
> > > > > > > > >
> > > > > > > > > * When using customized stores, users would usually go with
> > > > > > > > > Materialized.as(StoreSupplier). And it's possible if users
> > > would
> > > > > > choose
> > > > > > > > > some to be transactional while others non-transactional
> (e.g.
> > > if
> > > > > > their
> > > > > > > > > customized store only supports transactional for some store
> > > > types,
> > > > > > but
> > > > > > > > not
> > > > > > > > > others).
> > > > > > > > >
> > > > > > > > > * At a per-store level, the library do not really care, or
> > need
> > > > to
> > > > > > know
> > > > > > > > > whether that store is transactional or not at runtime,
> except
> > > for
> > > > > > > > > compatibility reasons today we want to make sure the
> written
> > > > > > checkpoint
> > > > > > > > > files do not include those non-transactional stores. But
> this
> > > > check
> > > > > > > would
> > > > > > > > > eventually go away as one day we would always checkpoint
> > files.
> > > > > > > > >
> > > > > > > > > ---------------------------
> > > > > > > > >
> > > > > > > > > With all of that in mind, my gut feeling is that:
> > > > > > > > >
> > > > > > > > > * Materialized#transactional(): we would not need this
> knob,
> > > > since
> > > > > > for
> > > > > > > > > built-in stores I think just a global config should be
> > > sufficient
> > > > > > (see
> > > > > > > > > below), while for customized store users would need to
> > specify
> > > > that
> > > > > > via
> > > > > > > > the
> > > > > > > > > StoreSupplier anyways and not through this API. Hence I
> think
> > > for
> > > > > > > either
> > > > > > > > > case we do not need to expose such a knob on the
> Materialized
> > > > > level.
> > > > > > > > >
> > > > > > > > > * Stores.persistentTransactionalKeyValueStore(): I think we
> > > could
> > > > > > > > refactor
> > > > > > > > > that function without introducing new constructors in the
> > > Stores
> > > > > > > factory,
> > > > > > > > > but just add new overloads to the existing func name e.g.
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > persistentKeyValueStore(final String name, final boolean
> > > > > > transactional)
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > Plus we can augment the storeImplType as introduced in
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
> > > > > > > > > as a syntax sugar for users, e.g.
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > public enum StoreImplType {
> > > > > > > > >     ROCKS_DB,
> > > > > > > > >     TXN_ROCKS_DB,
> > > > > > > > >     IN_MEMORY
> > > > > > > > >   }
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > >
> > > > > >
> > > stream.groupByKey().count(Materialized.withStoreType(StoreImplType.TXN_
> > > > > > > > > ROCKS_DB));
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > The above provides this global config at the store impl
> type
> > > > level.
> > > > > > > > >
> > > > > > > > > * RocksDBTransactionalMechanism: I agree with Bruno that we
> > > would
> > > > > > > better
> > > > > > > > > not expose this knob to users, but rather keep it purely as
> > an
> > > > impl
> > > > > > > > detail
> > > > > > > > > abstracted from the "TXN_ROCKS_DB" type. Over time we may,
> > e.g.
> > > > use
> > > > > > > > > in-memory stores as the secondary stores with optional
> > > > > spill-to-disks
> > > > > > > > when
> > > > > > > > > we hit the memory limit, but all of that optimizations in
> the
> > > > > future
> > > > > > > > should
> > > > > > > > > be kept away from the users.
> > > > > > > > >
> > > > > > > > > * StoreSupplier#transactional() /
> StateStore#transactional():
> > > the
> > > > > > first
> > > > > > > > > flag is only used to be passed into the StateStore layer,
> for
> > > > > > > indicating
> > > > > > > > if
> > > > > > > > > we should write checkpoints; we could mark it as @evolving
> so
> > > > that
> > > > > we
> > > > > > > can
> > > > > > > > > one day remove it without a long deprecation period.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 10, 2022 at 8:04 AM Alexander Sorokoumov
> > > > > > > > > <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > Hey Guozhang, Bruno,
> > > > > > > > > >
> > > > > > > > > > Thank you for your feedback. I am going to respond to
> both
> > of
> > > > you
> > > > > > in
> > > > > > > a
> > > > > > > > > > single email. I hope it is okay.
> > > > > > > > > >
> > > > > > > > > > @Guozhang,
> > > > > > > > > >
> > > > > > > > > > We could, instead, have a global
> > > > > > > > > > > config to specify if the built-in stores should be
> > > > > transactional
> > > > > > or
> > > > > > > > > not.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > This was the original approach I took in this proposal.
> > > Earlier
> > > > > in
> > > > > > > this
> > > > > > > > > > thread John, Sagar, and Bruno listed a number of issues
> > with
> > > > it.
> > > > > I
> > > > > > > tend
> > > > > > > > > to
> > > > > > > > > > agree with them that it is probably better user
> experience
> > to
> > > > > > control
> > > > > > > > > > transactionality via Materialized objects.
> > > > > > > > > >
> > > > > > > > > > We could simplify our implementation for `commit`
> > > > > > > > > >
> > > > > > > > > > Agreed! I updated the prototype and removed references to
> > the
> > > > > > commit
> > > > > > > > > marker
> > > > > > > > > > and rolling forward from the proposal.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > @Bruno,
> > > > > > > > > >
> > > > > > > > > > So, I would remove the details about the 2-state-store
> > > > > > implementation
> > > > > > > > > > > from the KIP or provide it as an example of a possible
> > > > > > > implementation
> > > > > > > > > at
> > > > > > > > > > > the end of the KIP.
> > > > > > > > > > >
> > > > > > > > > > I moved the section about the 2-state-store
> implementation
> > to
> > > > the
> > > > > > > > bottom
> > > > > > > > > of
> > > > > > > > > > the proposal and always mention it as a reference
> > > > implementation.
> > > > > > > > Please
> > > > > > > > > > let me know if this is okay.
> > > > > > > > > >
> > > > > > > > > > Could you please describe the usage of commit() and
> > recover()
> > > > in
> > > > > > the
> > > > > > > > > > > commit workflow in the KIP as we did in this thread but
> > > > > > > independently
> > > > > > > > > > > from the state store implementation?
> > > > > > > > > >
> > > > > > > > > > I described how commit/recover change the workflow in the
> > > > > Overview
> > > > > > > > > section.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Alex
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna <
> > > > > cado...@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Alex,
> > > > > > > > > > >
> > > > > > > > > > > Thank a lot for explaining!
> > > > > > > > > > >
> > > > > > > > > > > Now some aspects are clearer to me.
> > > > > > > > > > >
> > > > > > > > > > > While I understand now, how the state store can roll
> > > > forward, I
> > > > > > > have
> > > > > > > > > the
> > > > > > > > > > > feeling that rolling forward is specific to the
> > > 2-state-store
> > > > > > > > > > > implementation with RocksDB of your PoC. Other state
> > store
> > > > > > > > > > > implementations might use a different strategy to react
> > to
> > > > > > crashes.
> > > > > > > > For
> > > > > > > > > > > example, they might apply an atomic write and
> effectively
> > > > > > rollback
> > > > > > > if
> > > > > > > > > > > they crash before committing the state store
> > transaction. I
> > > > > think
> > > > > > > the
> > > > > > > > > > > KIP should not contain such implementation details but
> > > > provide
> > > > > an
> > > > > > > > > > > interface to accommodate rolling forward and rolling
> > > > backward.
> > > > > > > > > > >
> > > > > > > > > > > So, I would remove the details about the 2-state-store
> > > > > > > implementation
> > > > > > > > > > > from the KIP or provide it as an example of a possible
> > > > > > > implementation
> > > > > > > > > at
> > > > > > > > > > > the end of the KIP.
> > > > > > > > > > >
> > > > > > > > > > > Since a state store implementation can roll forward or
> > roll
> > > > > > back, I
> > > > > > > > > > > think it is fine to return the changelog offset from
> > > > recover().
> > > > > > > With
> > > > > > > > > the
> > > > > > > > > > > returned changelog offset, Streams knows from where to
> > > start
> > > > > > state
> > > > > > > > > store
> > > > > > > > > > > restoration.
> > > > > > > > > > >
> > > > > > > > > > > Could you please describe the usage of commit() and
> > > recover()
> > > > > in
> > > > > > > the
> > > > > > > > > > > commit workflow in the KIP as we did in this thread but
> > > > > > > independently
> > > > > > > > > > > from the state store implementation? That would make
> > things
> > > > > > > clearer.
> > > > > > > > > > > Additionally, descriptions of failure scenarios would
> > also
> > > be
> > > > > > > > helpful.
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Bruno
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On 04.08.22 16:39, Alexander Sorokoumov wrote:
> > > > > > > > > > > > Hey Bruno,
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you for the suggestions and the clarifying
> > > > questions. I
> > > > > > > > believe
> > > > > > > > > > > that
> > > > > > > > > > > > they cover the core of this proposal, so it is
> crucial
> > > for
> > > > us
> > > > > > to
> > > > > > > be
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > > same page.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Don't you want to deprecate StateStore#flush().
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Good call! I updated both the proposal and the
> > prototype.
> > > > > > > > > > > >
> > > > > > > > > > > >   2. I would shorten
> > > > > Materialized#withTransactionalityEnabled()
> > > > > > > to
> > > > > > > > > > > >> Materialized#withTransactionsEnabled().
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Turns out, these methods are no longer necessary. I
> > > removed
> > > > > > them
> > > > > > > > from
> > > > > > > > > > the
> > > > > > > > > > > > proposal and the prototype.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >> 3. Could you also describe a bit more in detail
> where
> > > the
> > > > > > > offsets
> > > > > > > > > > passed
> > > > > > > > > > > >> into commit() and recover() come from?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > The offset passed into StateStore#commit is the last
> > > offset
> > > > > > > > committed
> > > > > > > > > > to
> > > > > > > > > > > > the changelog topic. The offset passed into
> > > > > StateStore#recover
> > > > > > is
> > > > > > > > the
> > > > > > > > > > > last
> > > > > > > > > > > > checkpointed offset for the given StateStore. Let's
> > look
> > > at
> > > > > > > steps 3
> > > > > > > > > > and 4
> > > > > > > > > > > > in the commit workflow. After the
> > > TaskExecutor/TaskManager
> > > > > > > commits,
> > > > > > > > > it
> > > > > > > > > > > calls
> > > > > > > > > > > > StreamTask#postCommit[1] that in turn:
> > > > > > > > > > > > a. updates the changelog offsets via
> > > > > > > > > > > > ProcessorStateManager#updateChangelogOffsets[2]. The
> > > > offsets
> > > > > > here
> > > > > > > > > come
> > > > > > > > > > > from
> > > > > > > > > > > > the RecordCollector[3], which tracks the latest
> offsets
> > > the
> > > > > > > > producer
> > > > > > > > > > sent
> > > > > > > > > > > > without exception[4, 5].
> > > > > > > > > > > > b. flushes/commits the state store in
> > > > > > > > > AbstractTask#maybeCheckpoint[6].
> > > > > > > > > > > This
> > > > > > > > > > > > method essentially calls ProcessorStateManager
> methods
> > -
> > > > > > > > > > flush/commit[7]
> > > > > > > > > > > > and checkpoint[8]. ProcessorStateManager#commit goes
> > over
> > > > all
> > > > > > > state
> > > > > > > > > > > stores
> > > > > > > > > > > > that belong to that task and commits them with the
> > offset
> > > > > > > obtained
> > > > > > > > in
> > > > > > > > > > > step
> > > > > > > > > > > > `a`. ProcessorStateManager#checkpoint writes down
> those
> > > > > offsets
> > > > > > > for
> > > > > > > > > all
> > > > > > > > > > > > state stores, except for non-transactional ones in
> the
> > > case
> > > > > of
> > > > > > > EOS.
> > > > > > > > > > > >
> > > > > > > > > > > > During initialization, StreamTask calls
> > > > > > > > > > > > StateManagerUtil#registerStateStores[8] that in turn
> > > calls
> > > > > > > > > > > >
> > > > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9].
> > > > > > At
> > > > > > > > the
> > > > > > > > > > > > moment, this method assigns checkpointed offsets to
> the
> > > > > > > > corresponding
> > > > > > > > > > > state
> > > > > > > > > > > > stores[10]. The prototype also calls
> StateStore#recover
> > > > with
> > > > > > the
> > > > > > > > > > > > checkpointed offset and assigns the offset returned
> by
> > > > > > > > recover()[11].
> > > > > > > > > > > >
> > > > > > > > > > > > 4. I do not quite understand how a state store can
> roll
> > > > > > forward.
> > > > > > > > You
> > > > > > > > > > > >> mention in the thread the following:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > The 2-state-stores commit looks like this [12]:
> > > > > > > > > > > >
> > > > > > > > > > > >     1. Flush the temporary state store.
> > > > > > > > > > > >     2. Create a commit marker with a changelog offset
> > > > > > > corresponding
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > >     state we are committing.
> > > > > > > > > > > >     3. Go over all keys in the temporary store and
> > write
> > > > them
> > > > > > > down
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > >     main one.
> > > > > > > > > > > >     4. Wipe the temporary store.
> > > > > > > > > > > >     5. Delete the commit marker.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Let's consider crash failure scenarios:
> > > > > > > > > > > >
> > > > > > > > > > > >     - Crash failure happens between steps 1 and 2.
> The
> > > main
> > > > > > state
> > > > > > > > > store
> > > > > > > > > > > is
> > > > > > > > > > > >     in a consistent state that corresponds to the
> > > > previously
> > > > > > > > > > checkpointed
> > > > > > > > > > > >     offset. StateStore#recover throws away the
> > temporary
> > > > > store
> > > > > > > and
> > > > > > > > > > > proceeds
> > > > > > > > > > > >     from the last checkpointed offset.
> > > > > > > > > > > >     - Crash failure happens between steps 2 and 3. We
> > do
> > > > not
> > > > > > know
> > > > > > > > > what
> > > > > > > > > > > keys
> > > > > > > > > > > >     from the temporary store were already written to
> > the
> > > > main
> > > > > > > > store,
> > > > > > > > > so
> > > > > > > > > > > we
> > > > > > > > > > > >     can't roll back. There are two options - either
> > wipe
> > > > the
> > > > > > main
> > > > > > > > > store
> > > > > > > > > > > or roll
> > > > > > > > > > > >     forward. Since the point of this proposal is to
> > avoid
> > > > > > > > situations
> > > > > > > > > > > where we
> > > > > > > > > > > >     throw away the state and we do not care to what
> > > > > consistent
> > > > > > > > state
> > > > > > > > > > the
> > > > > > > > > > > store
> > > > > > > > > > > >     rolls to, we roll forward by continuing from step
> > 3.
> > > > > > > > > > > >     - Crash failure happens between steps 3 and 4. We
> > > can't
> > > > > > > > > distinguish
> > > > > > > > > > > >     between this and the previous scenario, so we
> write
> > > all
> > > > > the
> > > > > > > > keys
> > > > > > > > > > > from the
> > > > > > > > > > > >     temporary store. This is okay because the
> operation
> > > is
> > > > > > > > > idempotent.
> > > > > > > > > > > >     - Crash failure happens between steps 4 and 5.
> > Again,
> > > > we
> > > > > > > can't
> > > > > > > > > > > >     distinguish between this and previous scenarios,
> > but
> > > > the
> > > > > > > > > temporary
> > > > > > > > > > > store is
> > > > > > > > > > > >     already empty. Even though we write all keys from
> > the
> > > > > > > temporary
> > > > > > > > > > > store, this
> > > > > > > > > > > >     operation is, in fact, no-op.
> > > > > > > > > > > >     - Crash failure happens between step 5 and
> > > checkpoint.
> > > > > This
> > > > > > > is
> > > > > > > > > the
> > > > > > > > > > > case
> > > > > > > > > > > >     you referred to in question 5. The commit is
> > > finished,
> > > > > but
> > > > > > it
> > > > > > > > is
> > > > > > > > > > not
> > > > > > > > > > > >     reflected at the checkpoint. recover() returns
> the
> > > > offset
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > previous
> > > > > > > > > > > >     commit here, which is incorrect, but it is okay
> > > because
> > > > > we
> > > > > > > will
> > > > > > > > > > > replay the
> > > > > > > > > > > >     changelog from the previously committed offset.
> As
> > > > > > changelog
> > > > > > > > > replay
> > > > > > > > > > > is
> > > > > > > > > > > >     idempotent, the state store recovers into a
> > > consistent
> > > > > > state.
> > > > > > > > > > > >
> > > > > > > > > > > > The last crash failure scenario is a natural
> transition
> > > to
> > > > > > > > > > > >
> > > > > > > > > > > > how should Streams know what to write into the
> > checkpoint
> > > > > file
> > > > > > > > > > > >> after the crash?
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > > As mentioned above, the Streams app writes the
> > checkpoint
> > > > > file
> > > > > > > > after
> > > > > > > > > > the
> > > > > > > > > > > > Kafka transaction and then the StateStore commit.
> Same
> > as
> > > > > > without
> > > > > > > > the
> > > > > > > > > > > > proposal, it should write the committed offset, as it
> > is
> > > > the
> > > > > > same
> > > > > > > > for
> > > > > > > > > > > both
> > > > > > > > > > > > the Kafka changelog and the state store.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >> This issue arises because we store the offset
> outside
> > of
> > > > the
> > > > > > > state
> > > > > > > > > > > >> store. Maybe we need an additional method on the
> state
> > > > store
> > > > > > > > > interface
> > > > > > > > > > > >> that returns the offset at which the state store is.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > In my opinion, we should include in the interface
> only
> > > the
> > > > > > > > guarantees
> > > > > > > > > > > that
> > > > > > > > > > > > are necessary to preserve EOS without wiping the
> local
> > > > state.
> > > > > > > This
> > > > > > > > > way,
> > > > > > > > > > > we
> > > > > > > > > > > > allow more room for possible implementations. Thanks
> to
> > > the
> > > > > > > > > idempotency
> > > > > > > > > > > of
> > > > > > > > > > > > the changelog replay, it is "good enough" if
> > > > > StateStore#recover
> > > > > > > > > returns
> > > > > > > > > > > the
> > > > > > > > > > > > offset that is less than what it actually is. The
> only
> > > > > > limitation
> > > > > > > > > here
> > > > > > > > > > is
> > > > > > > > > > > > that the state store should never commit writes that
> > are
> > > > not
> > > > > > yet
> > > > > > > > > > > committed
> > > > > > > > > > > > in Kafka changelog.
> > > > > > > > > > > >
> > > > > > > > > > > > Please let me know what you think about this. First
> of
> > > > all, I
> > > > > > am
> > > > > > > > > > > relatively
> > > > > > > > > > > > new to the codebase, so I might be wrong in my
> > > > understanding
> > > > > of
> > > > > > > > > > > > how it works. Second, while writing this, it occured
> to
> > > me
> > > > > that
> > > > > > > the
> > > > > > > > > > > > StateStore#recover interface method is not
> > > straightforward
> > > > as
> > > > > > it
> > > > > > > > can
> > > > > > > > > > be.
> > > > > > > > > > > > Maybe we can change it like that:
> > > > > > > > > > > >
> > > > > > > > > > > > /**
> > > > > > > > > > > >      * Recover a transactional state store
> > > > > > > > > > > >      * <p>
> > > > > > > > > > > >      * If a transactional state store shut down with
> a
> > > > crash
> > > > > > > > failure,
> > > > > > > > > > > this
> > > > > > > > > > > > method ensures that the
> > > > > > > > > > > >      * state store is in a consistent state that
> > > > corresponds
> > > > > to
> > > > > > > > > {@code
> > > > > > > > > > > > changelofOffset} or later.
> > > > > > > > > > > >      *
> > > > > > > > > > > >      * @param changelogOffset the checkpointed
> > changelog
> > > > > > offset.
> > > > > > > > > > > >      * @return {@code true} if recovery succeeded,
> > {@code
> > > > > > false}
> > > > > > > > > > > otherwise.
> > > > > > > > > > > >      */
> > > > > > > > > > > > boolean recover(final Long changelogOffset) {
> > > > > > > > > > > >
> > > > > > > > > > > > Note: all links below except for [10] lead to the
> > > > prototype's
> > > > > > > code.
> > > > > > > > > > > > 1.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468
> > > > > > > > > > > > 2.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580
> > > > > > > > > > > > 3.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868
> > > > > > > > > > > > 4.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96
> > > > > > > > > > > > 5.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216
> > > > > > > > > > > > 6.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97
> > > > > > > > > > > > 7.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469
> > > > > > > > > > > > 8.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226
> > > > > > > > > > > > 9.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103
> > > > > > > > > > > > 10.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252
> > > > > > > > > > > > 11.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265
> > > > > > > > > > > > 12.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Alex
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <
> > > > > > > cado...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi Alex,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for the updates!
> > > > > > > > > > > >>
> > > > > > > > > > > >> 1. Don't you want to deprecate StateStore#flush().
> As
> > > far
> > > > > as I
> > > > > > > > > > > >> understand, commit() is the new flush(), right? If
> you
> > > do
> > > > > not
> > > > > > > > > > deprecate
> > > > > > > > > > > >> it, you don't get rid of the error room you describe
> > in
> > > > your
> > > > > > KIP
> > > > > > > > by
> > > > > > > > > > > >> having a flush() and a commit().
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> 2. I would shorten
> > > > > Materialized#withTransactionalityEnabled()
> > > > > > to
> > > > > > > > > > > >> Materialized#withTransactionsEnabled().
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> 3. Could you also describe a bit more in detail
> where
> > > the
> > > > > > > offsets
> > > > > > > > > > passed
> > > > > > > > > > > >> into commit() and recover() come from?
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> For my next two points, I need the commit workflow
> > that
> > > > you
> > > > > > were
> > > > > > > > so
> > > > > > > > > > kind
> > > > > > > > > > > >> to post into this thread:
> > > > > > > > > > > >>
> > > > > > > > > > > >> 1. write stuff to the state store
> > > > > > > > > > > >> 2. producer.sendOffsetsToTransaction(token);
> > > > > > > > > > > producer.commitTransaction();
> > > > > > > > > > > >> 3. flush (<- that would be call to commit(), right?)
> > > > > > > > > > > >> 4. checkpoint
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> 4. I do not quite understand how a state store can
> > roll
> > > > > > forward.
> > > > > > > > You
> > > > > > > > > > > >> mention in the thread the following:
> > > > > > > > > > > >>
> > > > > > > > > > > >> "If the crash failure happens during #3, the state
> > store
> > > > can
> > > > > > > roll
> > > > > > > > > > > >> forward and finish the flush/commit."
> > > > > > > > > > > >>
> > > > > > > > > > > >> How does the state store know where it stopped the
> > > > flushing
> > > > > > when
> > > > > > > > it
> > > > > > > > > > > >> crashed?
> > > > > > > > > > > >>
> > > > > > > > > > > >> This seems an optimization to me. I think in general
> > the
> > > > > state
> > > > > > > > store
> > > > > > > > > > > >> should rollback to the last successfully committed
> > state
> > > > and
> > > > > > > > restore
> > > > > > > > > > > >> from there until the end of the changelog topic
> > > partition.
> > > > > The
> > > > > > > > last
> > > > > > > > > > > >> committed state is the offsets in the checkpoint
> file.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> 5. In the same e-mail from point 4, you also state:
> > > > > > > > > > > >>
> > > > > > > > > > > >> "If the crash failure happens between #3 and #4, the
> > > state
> > > > > > store
> > > > > > > > > > should
> > > > > > > > > > > >> do nothing during recovery and just proceed with the
> > > > > > > checkpoint."
> > > > > > > > > > > >>
> > > > > > > > > > > >> How should Streams know that the failure was between
> > #3
> > > > and
> > > > > #4
> > > > > > > > > during
> > > > > > > > > > > >> recovery? It just sees a valid state store and a
> valid
> > > > > > > checkpoint
> > > > > > > > > > file.
> > > > > > > > > > > >> Streams does not know that the state of the
> checkpoint
> > > > file
> > > > > > does
> > > > > > > > not
> > > > > > > > > > > >> match with the committed state of the state store.
> > > > > > > > > > > >> Also, how should Streams know what to write into the
> > > > > > checkpoint
> > > > > > > > file
> > > > > > > > > > > >> after the crash?
> > > > > > > > > > > >> This issue arises because we store the offset
> outside
> > of
> > > > the
> > > > > > > state
> > > > > > > > > > > >> store. Maybe we need an additional method on the
> state
> > > > store
> > > > > > > > > interface
> > > > > > > > > > > >> that returns the offset at which the state store is.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> Best,
> > > > > > > > > > > >> Bruno
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On 27.07.22 11:51, Alexander Sorokoumov wrote:
> > > > > > > > > > > >>> Hey Nick,
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Thank you for the kind words and the feedback! I'll
> > > > > > definitely
> > > > > > > > add
> > > > > > > > > an
> > > > > > > > > > > >>> option to configure the transactional mechanism in
> > > Stores
> > > > > > > factory
> > > > > > > > > > > method
> > > > > > > > > > > >>> via an argument as John previously suggested and
> > might
> > > > add
> > > > > > the
> > > > > > > > > > > in-memory
> > > > > > > > > > > >>> option via RocksDB Indexed Batches if I figure why
> > > their
> > > > > > > creation
> > > > > > > > > via
> > > > > > > > > > > >>> rocksdb jni fails with `UnsatisfiedLinkException`.
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Best,
> > > > > > > > > > > >>> Alex
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Wed, Jul 27, 2022 at 11:46 AM Alexander
> > Sorokoumov <
> > > > > > > > > > > >>> asorokou...@confluent.io> wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>>> Hey Guozhang,
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 1) About the param passed into the `recover()`
> > > function:
> > > > > it
> > > > > > > > seems
> > > > > > > > > to
> > > > > > > > > > > me
> > > > > > > > > > > >>>>> that the semantics of "recover(offset)" is:
> recover
> > > > this
> > > > > > > state
> > > > > > > > > to a
> > > > > > > > > > > >>>>> transaction boundary which is at least the
> > passed-in
> > > > > > offset.
> > > > > > > > And
> > > > > > > > > > the
> > > > > > > > > > > >> only
> > > > > > > > > > > >>>>> possibility that the returned offset is different
> > > than
> > > > > the
> > > > > > > > > > passed-in
> > > > > > > > > > > >>>>> offset
> > > > > > > > > > > >>>>> is that if the previous failure happens after
> we've
> > > > done
> > > > > > all
> > > > > > > > the
> > > > > > > > > > > commit
> > > > > > > > > > > >>>>> procedures except writing the new checkpoint, in
> > > which
> > > > > case
> > > > > > > the
> > > > > > > > > > > >> returned
> > > > > > > > > > > >>>>> offset would be larger than the passed-in offset.
> > > > > Otherwise
> > > > > > > it
> > > > > > > > > > should
> > > > > > > > > > > >>>>> always be equal to the passed-in offset, is that
> > > right?
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Right now, the only case when `recover` returns an
> > > > offset
> > > > > > > > > different
> > > > > > > > > > > from
> > > > > > > > > > > >>>> the passed one is when the failure happens
> *during*
> > > > > commit.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> If the failure happens after commit but before the
> > > > > > checkpoint,
> > > > > > > > > > > `recover`
> > > > > > > > > > > >>>> might return either a passed or newer committed
> > > offset,
> > > > > > > > depending
> > > > > > > > > on
> > > > > > > > > > > the
> > > > > > > > > > > >>>> implementation. The `recover` implementation in
> the
> > > > > > prototype
> > > > > > > > > > returns
> > > > > > > > > > > a
> > > > > > > > > > > >>>> passed offset because it deletes the commit marker
> > > that
> > > > > > holds
> > > > > > > > that
> > > > > > > > > > > >> offset
> > > > > > > > > > > >>>> after the commit is done. In that case, the store
> > will
> > > > > > replay
> > > > > > > > the
> > > > > > > > > > last
> > > > > > > > > > > >>>> commit from the changelog. I think it is fine as
> the
> > > > > > changelog
> > > > > > > > > > replay
> > > > > > > > > > > is
> > > > > > > > > > > >>>> idempotent.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 2) It seems the only use for the "transactional()"
> > > > > function
> > > > > > is
> > > > > > > > to
> > > > > > > > > > > >> determine
> > > > > > > > > > > >>>>> if we can update the checkpoint file while in
> EOS.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Right now, there are 2 other uses for
> > > `transactional()`:
> > > > > > > > > > > >>>> 1. To determine what to do during initialization
> if
> > > the
> > > > > > > > checkpoint
> > > > > > > > > > is
> > > > > > > > > > > >> gone
> > > > > > > > > > > >>>> (see [1]). If the state store is transactional, we
> > > don't
> > > > > > have
> > > > > > > to
> > > > > > > > > > wipe
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>> existing data. Thinking about it now, we do not
> > really
> > > > > need
> > > > > > > this
> > > > > > > > > > check
> > > > > > > > > > > >>>> whether the store is `transactional` because if it
> > is
> > > > not,
> > > > > > > we'd
> > > > > > > > > not
> > > > > > > > > > > have
> > > > > > > > > > > >>>> written the checkpoint in the first place. I am
> > going
> > > to
> > > > > > > remove
> > > > > > > > > that
> > > > > > > > > > > >> check.
> > > > > > > > > > > >>>> 2. To determine if the persistent kv store in
> > > > > > KStreamImplJoin
> > > > > > > > > should
> > > > > > > > > > > be
> > > > > > > > > > > >>>> transactional (see [2], [3]).
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> I am not sure if we can get rid of the checks in
> > point
> > > > 2.
> > > > > If
> > > > > > > so,
> > > > > > > > > I'd
> > > > > > > > > > > be
> > > > > > > > > > > >>>> happy to encapsulate `transactional()` logic in
> > > > > > > > `commit/recover`.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> Best,
> > > > > > > > > > > >>>> Alex
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> 1.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
> > > > > > > > > > > >>>> 2.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
> > > > > > > > > > > >>>> 3.
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>> On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <
> > > > > > > > > > nick.telf...@gmail.com>
> > > > > > > > > > > >>>> wrote:
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>>> Hi Alex,
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Excellent proposal, I'm very keen to see this
> land!
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Would it be useful to permit configuring the type
> > of
> > > > > store
> > > > > > > used
> > > > > > > > > for
> > > > > > > > > > > >>>>> uncommitted offsets on a store-by-store basis?
> This
> > > > way,
> > > > > > > users
> > > > > > > > > > could
> > > > > > > > > > > >>>>> choose
> > > > > > > > > > > >>>>> whether to use, e.g. an in-memory store or
> RocksDB,
> > > > > > > potentially
> > > > > > > > > > > >> reducing
> > > > > > > > > > > >>>>> the overheads associated with RocksDb for smaller
> > > > stores,
> > > > > > but
> > > > > > > > > > without
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>> memory pressure issues?
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> I suspect that in most cases, the number of
> > > uncommitted
> > > > > > > records
> > > > > > > > > > will
> > > > > > > > > > > be
> > > > > > > > > > > >>>>> very small, because the default commit interval
> is
> > > > 100ms.
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Regards,
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> Nick
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <
> > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>>> Hello Alex,
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Thanks for the updated KIP, I looked over it and
> > > > browsed
> > > > > > the
> > > > > > > > WIP
> > > > > > > > > > and
> > > > > > > > > > > >>>>> just
> > > > > > > > > > > >>>>>> have a couple meta thoughts:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> 1) About the param passed into the `recover()`
> > > > function:
> > > > > > it
> > > > > > > > > seems
> > > > > > > > > > to
> > > > > > > > > > > >> me
> > > > > > > > > > > >>>>>> that the semantics of "recover(offset)" is:
> > recover
> > > > this
> > > > > > > state
> > > > > > > > > to
> > > > > > > > > > a
> > > > > > > > > > > >>>>>> transaction boundary which is at least the
> > passed-in
> > > > > > offset.
> > > > > > > > And
> > > > > > > > > > the
> > > > > > > > > > > >>>>> only
> > > > > > > > > > > >>>>>> possibility that the returned offset is
> different
> > > than
> > > > > the
> > > > > > > > > > passed-in
> > > > > > > > > > > >>>>> offset
> > > > > > > > > > > >>>>>> is that if the previous failure happens after
> > we've
> > > > done
> > > > > > all
> > > > > > > > the
> > > > > > > > > > > >> commit
> > > > > > > > > > > >>>>>> procedures except writing the new checkpoint, in
> > > which
> > > > > > case
> > > > > > > > the
> > > > > > > > > > > >> returned
> > > > > > > > > > > >>>>>> offset would be larger than the passed-in
> offset.
> > > > > > Otherwise
> > > > > > > it
> > > > > > > > > > > should
> > > > > > > > > > > >>>>>> always be equal to the passed-in offset, is that
> > > > right?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> 2) It seems the only use for the
> "transactional()"
> > > > > > function
> > > > > > > is
> > > > > > > > > to
> > > > > > > > > > > >>>>> determine
> > > > > > > > > > > >>>>>> if we can update the checkpoint file while in
> EOS.
> > > But
> > > > > the
> > > > > > > > > purpose
> > > > > > > > > > > of
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>> checkpoint file's offsets is just to tell "the
> > local
> > > > > > state's
> > > > > > > > > > current
> > > > > > > > > > > >>>>>> snapshot's progress is at least the indicated
> > > offsets"
> > > > > > > > anyways,
> > > > > > > > > > and
> > > > > > > > > > > >> with
> > > > > > > > > > > >>>>>> this KIP maybe we would just do:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> a) when in ALOS, upon failover: we set the
> > starting
> > > > > offset
> > > > > > > as
> > > > > > > > > > > >>>>>> checkpointed-offset, then restore() from
> changelog
> > > > till
> > > > > > the
> > > > > > > > > > > >> end-offset.
> > > > > > > > > > > >>>>>> This way we may restore some records twice.
> > > > > > > > > > > >>>>>> b) when in EOS, upon failover: we first call
> > > > > > > > > > > >>>>> recover(checkpointed-offset),
> > > > > > > > > > > >>>>>> then set the starting offset as the returned
> > offset
> > > > > (which
> > > > > > > may
> > > > > > > > > be
> > > > > > > > > > > >> larger
> > > > > > > > > > > >>>>>> than checkpointed-offset), then restore until
> the
> > > > > > > end-offset.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> So why not also:
> > > > > > > > > > > >>>>>> c) we let the `commit()` function to also return
> > an
> > > > > > offset,
> > > > > > > > > which
> > > > > > > > > > > >>>>> indicates
> > > > > > > > > > > >>>>>> "checkpointable offsets".
> > > > > > > > > > > >>>>>> d) for existing non-transactional stores, we
> just
> > > > have a
> > > > > > > > default
> > > > > > > > > > > >>>>>> implementation of "commit()" which is simply a
> > > flush,
> > > > > and
> > > > > > > > > returns
> > > > > > > > > > a
> > > > > > > > > > > >>>>>> sentinel value like -1. Then later if we get
> > > > > > checkpointable
> > > > > > > > > > offsets
> > > > > > > > > > > >> -1,
> > > > > > > > > > > >>>>> we
> > > > > > > > > > > >>>>>> do not write the checkpoint. Upon clean shutting
> > > down
> > > > we
> > > > > > can
> > > > > > > > > just
> > > > > > > > > > > >>>>>> checkpoint regardless of the returned value from
> > > > > "commit".
> > > > > > > > > > > >>>>>> e) for existing non-transactional stores, we
> just
> > > > have a
> > > > > > > > default
> > > > > > > > > > > >>>>>> implementation of "recover()" which is to wipe
> out
> > > the
> > > > > > local
> > > > > > > > > store
> > > > > > > > > > > and
> > > > > > > > > > > >>>>>> return offset 0 if the passed in offset is -1,
> > > > otherwise
> > > > > > if
> > > > > > > > not
> > > > > > > > > -1
> > > > > > > > > > > >> then
> > > > > > > > > > > >>>>> it
> > > > > > > > > > > >>>>>> indicates a clean shutdown in the last run, can
> > this
> > > > > > > function
> > > > > > > > is
> > > > > > > > > > > just
> > > > > > > > > > > >> a
> > > > > > > > > > > >>>>>> no-op.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> In that case, we would not need the
> > > "transactional()"
> > > > > > > function
> > > > > > > > > > > >> anymore,
> > > > > > > > > > > >>>>>> since for non-transactional stores their
> behaviors
> > > are
> > > > > > still
> > > > > > > > > > wrapped
> > > > > > > > > > > >> in
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>> `commit / recover` function pairs.
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> I have not completed the thorough pass on your
> WIP
> > > PR,
> > > > > so
> > > > > > > > maybe
> > > > > > > > > I
> > > > > > > > > > > >> could
> > > > > > > > > > > >>>>>> come up with some more feedback later, but just
> > let
> > > me
> > > > > > know
> > > > > > > if
> > > > > > > > > my
> > > > > > > > > > > >>>>>> understanding above is correct or not?
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> Guozhang
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> On Thu, Jul 14, 2022 at 7:01 AM Alexander
> > Sorokoumov
> > > > > > > > > > > >>>>>> <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>> Hi,
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I updated the KIP with the following changes:
> > > > > > > > > > > >>>>>>> * Replaced in-memory batches with the
> > > secondary-store
> > > > > > > > approach
> > > > > > > > > as
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>>> default implementation to address the feedback
> > > about
> > > > > > memory
> > > > > > > > > > > pressure
> > > > > > > > > > > >>>>> as
> > > > > > > > > > > >>>>>>> suggested by Sagar and Bruno.
> > > > > > > > > > > >>>>>>> * Introduced StateStore#commit and
> > > StateStore#recover
> > > > > > > methods
> > > > > > > > > as
> > > > > > > > > > an
> > > > > > > > > > > >>>>>>> extension of the rollback idea. @Guozhang,
> please
> > > see
> > > > > the
> > > > > > > > > comment
> > > > > > > > > > > >>>>> below
> > > > > > > > > > > >>>>>> on
> > > > > > > > > > > >>>>>>> why I took a slightly different approach than
> you
> > > > > > > suggested.
> > > > > > > > > > > >>>>>>> * Removed mentions of changes to IQv1 and IQv2.
> > > > > > > Transactional
> > > > > > > > > > state
> > > > > > > > > > > >>>>>> stores
> > > > > > > > > > > >>>>>>> enable reading committed in IQ, but it is
> really
> > an
> > > > > > > > independent
> > > > > > > > > > > >>>>> feature
> > > > > > > > > > > >>>>>>> that deserves its own KIP. Conflating them
> > > > > unnecessarily
> > > > > > > > > > increases
> > > > > > > > > > > >> the
> > > > > > > > > > > >>>>>>> scope for discussion, implementation, and
> testing
> > > in
> > > > a
> > > > > > > single
> > > > > > > > > > unit
> > > > > > > > > > > of
> > > > > > > > > > > >>>>>> work.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I also published a prototype -
> > > > > > > > > > > >>>>>> https://github.com/apache/kafka/pull/12393
> > > > > > > > > > > >>>>>>> that implements changes described in the
> > proposal.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Regarding explicit rollback, I think it is a
> > > powerful
> > > > > > idea
> > > > > > > > that
> > > > > > > > > > > >> allows
> > > > > > > > > > > >>>>>>> other StateStore implementations to take a
> > > different
> > > > > path
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > >>>>>>> transactional behavior rather than keep 2 state
> > > > stores.
> > > > > > > > Instead
> > > > > > > > > > of
> > > > > > > > > > > >>>>>>> introducing a new commit token, I suggest
> using a
> > > > > > changelog
> > > > > > > > > > offset
> > > > > > > > > > > >>>>> that
> > > > > > > > > > > >>>>>>> already 1:1 corresponds to the materialized
> > state.
> > > > This
> > > > > > > works
> > > > > > > > > > > nicely
> > > > > > > > > > > >>>>>>> because Kafka Stream first commits an AK
> > > transaction
> > > > > and
> > > > > > > only
> > > > > > > > > > then
> > > > > > > > > > > >>>>>>> checkpoints the state store, so we can use the
> > > > > changelog
> > > > > > > > offset
> > > > > > > > > > to
> > > > > > > > > > > >>>>> commit
> > > > > > > > > > > >>>>>>> the state store transaction.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> I called the method StateStore#recover rather
> > than
> > > > > > > > > > > >> StateStore#rollback
> > > > > > > > > > > >>>>>>> because a state store might either roll back or
> > > > forward
> > > > > > > > > depending
> > > > > > > > > > > on
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> specific point of the crash failure.Consider
> the
> > > > write
> > > > > > > > > algorithm
> > > > > > > > > > in
> > > > > > > > > > > >>>>> Kafka
> > > > > > > > > > > >>>>>>> Streams is:
> > > > > > > > > > > >>>>>>> 1. write stuff to the state store
> > > > > > > > > > > >>>>>>> 2. producer.sendOffsetsToTransaction(token);
> > > > > > > > > > > >>>>>> producer.commitTransaction();
> > > > > > > > > > > >>>>>>> 3. flush
> > > > > > > > > > > >>>>>>> 4. checkpoint
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Let's consider 3 cases:
> > > > > > > > > > > >>>>>>> 1. If the crash failure happens between #2 and
> > #3,
> > > > the
> > > > > > > state
> > > > > > > > > > store
> > > > > > > > > > > >>>>> rolls
> > > > > > > > > > > >>>>>>> back and replays the uncommitted transaction
> from
> > > the
> > > > > > > > > changelog.
> > > > > > > > > > > >>>>>>> 2. If the crash failure happens during #3, the
> > > state
> > > > > > store
> > > > > > > > can
> > > > > > > > > > roll
> > > > > > > > > > > >>>>>> forward
> > > > > > > > > > > >>>>>>> and finish the flush/commit.
> > > > > > > > > > > >>>>>>> 3. If the crash failure happens between #3 and
> > #4,
> > > > the
> > > > > > > state
> > > > > > > > > > store
> > > > > > > > > > > >>>>> should
> > > > > > > > > > > >>>>>>> do nothing during recovery and just proceed
> with
> > > the
> > > > > > > > > checkpoint.
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> Looking forward to your feedback,
> > > > > > > > > > > >>>>>>> Alexander
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:16 AM Alexander
> > > Sorokoumov
> > > > <
> > > > > > > > > > > >>>>>>> asorokou...@confluent.io> wrote:
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>>> Hi,
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> As a status update, I did the following
> changes
> > to
> > > > the
> > > > > > > KIP:
> > > > > > > > > > > >>>>>>>> * replaced configuration via the top-level
> > config
> > > > with
> > > > > > > > > > > configuration
> > > > > > > > > > > >>>>>> via
> > > > > > > > > > > >>>>>>>> Stores factory and StoreSuppliers,
> > > > > > > > > > > >>>>>>>> * added IQv2 and elaborated how readCommitted
> > will
> > > > > work
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > > >>>>> store
> > > > > > > > > > > >>>>>> is
> > > > > > > > > > > >>>>>>>> not transactional,
> > > > > > > > > > > >>>>>>>> * removed claims about ALOS.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> I am going to be OOO in the next couple of
> weeks
> > > and
> > > > > > will
> > > > > > > > > resume
> > > > > > > > > > > >>>>>> working
> > > > > > > > > > > >>>>>>>> on the proposal and responding to the
> discussion
> > > in
> > > > > this
> > > > > > > > > thread
> > > > > > > > > > > >>>>>> starting
> > > > > > > > > > > >>>>>>>> June 27. My next top priorities are:
> > > > > > > > > > > >>>>>>>> 1. Prototype the rollback approach as
> suggested
> > by
> > > > > > > Guozhang.
> > > > > > > > > > > >>>>>>>> 2. Replace in-memory batches with the
> > > > secondary-store
> > > > > > > > approach
> > > > > > > > > > as
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>> default implementation to address the feedback
> > > about
> > > > > > > memory
> > > > > > > > > > > >>>>> pressure as
> > > > > > > > > > > >>>>>>>> suggested by Sagar and Bruno.
> > > > > > > > > > > >>>>>>>> 3. Adjust Stores methods to make transactional
> > > > > > > > implementations
> > > > > > > > > > > >>>>>> pluggable.
> > > > > > > > > > > >>>>>>>> 4. Publish the POC for the first review.
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> Best regards,
> > > > > > > > > > > >>>>>>>> Alex
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>> On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <
> > > > > > > > > > wangg...@gmail.com>
> > > > > > > > > > > >>>>>> wrote:
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>>> Alex,
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Thanks for your replies! That is very
> helpful.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Just to broaden our discussions a bit here, I
> > > think
> > > > > > there
> > > > > > > > are
> > > > > > > > > > > some
> > > > > > > > > > > >>>>>> other
> > > > > > > > > > > >>>>>>>>> approaches in parallel to the idea of
> "enforce
> > to
> > > > > only
> > > > > > > > > persist
> > > > > > > > > > > upon
> > > > > > > > > > > >>>>>>>>> explicit flush" and I'd like to throw one
> here
> > --
> > > > not
> > > > > > > > really
> > > > > > > > > > > >>>>>> advocating
> > > > > > > > > > > >>>>>>>>> it,
> > > > > > > > > > > >>>>>>>>> but just for us to compare the pros and cons:
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> 1) We let the StateStore's `flush` function
> to
> > > > > return a
> > > > > > > > token
> > > > > > > > > > > >>>>> instead
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>>>> returning `void`.
> > > > > > > > > > > >>>>>>>>> 2) We add another `rollback(token)` interface
> > of
> > > > > > > StateStore
> > > > > > > > > > which
> > > > > > > > > > > >>>>>> would
> > > > > > > > > > > >>>>>>>>> effectively rollback the state as indicated
> by
> > > the
> > > > > > token
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > >>>>>> snapshot
> > > > > > > > > > > >>>>>>>>> when the corresponding `flush` is called.
> > > > > > > > > > > >>>>>>>>> 3) We encode the token and commit as part of
> > > > > > > > > > > >>>>>>>>> `producer#sendOffsetsToTransaction`.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Users could optionally implement the new
> > > functions,
> > > > > or
> > > > > > > they
> > > > > > > > > can
> > > > > > > > > > > >>>>> just
> > > > > > > > > > > >>>>>> not
> > > > > > > > > > > >>>>>>>>> return the token at all and not implement the
> > > > second
> > > > > > > > > function.
> > > > > > > > > > > >>>>> Again,
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>>> APIs are just for the sake of illustration,
> not
> > > > > feeling
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>> most
> > > > > > > > > > > >>>>>>>>> natural :)
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Then the procedure would be:
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> 1. the previous checkpointed offset is 100
> > > > > > > > > > > >>>>>>>>> ...
> > > > > > > > > > > >>>>>>>>> 3. flush store, make sure all writes are
> > > persisted;
> > > > > get
> > > > > > > the
> > > > > > > > > > > >>>>> returned
> > > > > > > > > > > >>>>>>> token
> > > > > > > > > > > >>>>>>>>> that indicates the snapshot of 200.
> > > > > > > > > > > >>>>>>>>> 4. producer.sendOffsetsToTransaction(token);
> > > > > > > > > > > >>>>>>> producer.commitTransaction();
> > > > > > > > > > > >>>>>>>>> 5. Update the checkpoint file (say, the new
> > value
> > > > is
> > > > > > > 200).
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Then if there's a failure, say between 3/4,
> we
> > > > would
> > > > > > get
> > > > > > > > the
> > > > > > > > > > > token
> > > > > > > > > > > >>>>>> from
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>> last committed txn, and first we would do the
> > > > > > restoration
> > > > > > > > > > (which
> > > > > > > > > > > >>>>> may
> > > > > > > > > > > >>>>>> get
> > > > > > > > > > > >>>>>>>>> the state to somewhere between 100 and 200),
> > then
> > > > > call
> > > > > > > > > > > >>>>>>>>> `store.rollback(token)` to rollback to the
> > > snapshot
> > > > > of
> > > > > > > > offset
> > > > > > > > > > > 100.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> The pros is that we would then not need to
> > > enforce
> > > > > the
> > > > > > > > state
> > > > > > > > > > > >>>>> stores to
> > > > > > > > > > > >>>>>>> not
> > > > > > > > > > > >>>>>>>>> persist any data during the txn: for stores
> > that
> > > > may
> > > > > > not
> > > > > > > be
> > > > > > > > > > able
> > > > > > > > > > > to
> > > > > > > > > > > >>>>>>>>> implement the `rollback` function, they can
> > still
> > > > > > reduce
> > > > > > > > its
> > > > > > > > > > impl
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>>> "not
> > > > > > > > > > > >>>>>>>>> persisting any data" via this API, but for
> > stores
> > > > > that
> > > > > > > can
> > > > > > > > > > indeed
> > > > > > > > > > > >>>>>>> support
> > > > > > > > > > > >>>>>>>>> the rollback, their implementation may be
> more
> > > > > > efficient.
> > > > > > > > The
> > > > > > > > > > > cons
> > > > > > > > > > > >>>>>>> though,
> > > > > > > > > > > >>>>>>>>> on top of my head are 1) more complicated
> logic
> > > > > > > > > differentiating
> > > > > > > > > > > >>>>>> between
> > > > > > > > > > > >>>>>>>>> EOS
> > > > > > > > > > > >>>>>>>>> with and without store rollback support, and
> > > ALOS,
> > > > 2)
> > > > > > > > > encoding
> > > > > > > > > > > the
> > > > > > > > > > > >>>>>> token
> > > > > > > > > > > >>>>>>>>> as
> > > > > > > > > > > >>>>>>>>> part of the commit offset is not ideal if it
> is
> > > > big,
> > > > > 3)
> > > > > > > the
> > > > > > > > > > > >>>>> recovery
> > > > > > > > > > > >>>>>>> logic
> > > > > > > > > > > >>>>>>>>> including the state store is also a bit more
> > > > > > complicated.
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> Guozhang
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> On Wed, Jun 1, 2022 at 1:29 PM Alexander
> > > Sorokoumov
> > > > > > > > > > > >>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Hi Guozhang,
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> But I'm still trying to clarify how it
> > > guarantees
> > > > > EOS,
> > > > > > > and
> > > > > > > > > it
> > > > > > > > > > > >>>>> seems
> > > > > > > > > > > >>>>>>>>> that we
> > > > > > > > > > > >>>>>>>>>>> would achieve it by enforcing to not
> persist
> > > any
> > > > > data
> > > > > > > > > written
> > > > > > > > > > > >>>>>> within
> > > > > > > > > > > >>>>>>>>> this
> > > > > > > > > > > >>>>>>>>>>> transaction until step 4. Is that correct?
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> This is correct. Both alternatives -
> in-memory
> > > > > > > > > > > >>>>> WriteBatchWithIndex
> > > > > > > > > > > >>>>>> and
> > > > > > > > > > > >>>>>>>>>> transactionality via the secondary store
> > > guarantee
> > > > > EOS
> > > > > > > by
> > > > > > > > > not
> > > > > > > > > > > >>>>>>> persisting
> > > > > > > > > > > >>>>>>>>>> data in the "main" state store until it is
> > > > committed
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > >>>>>> changelog
> > > > > > > > > > > >>>>>>>>>> topic.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Oh what I meant is not what KStream code
> does,
> > > but
> > > > > > that
> > > > > > > > > > > >>>>> StateStore
> > > > > > > > > > > >>>>>>> impl
> > > > > > > > > > > >>>>>>>>>>> classes themselves could potentially flush
> > data
> > > > to
> > > > > > > become
> > > > > > > > > > > >>>>>> persisted
> > > > > > > > > > > >>>>>>>>>>> asynchronously
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Thank you for elaborating! You are correct,
> > the
> > > > > > > underlying
> > > > > > > > > > state
> > > > > > > > > > > >>>>>> store
> > > > > > > > > > > >>>>>>>>>> should not persist data until the streams
> app
> > > > calls
> > > > > > > > > > > >>>>>> StateStore#flush.
> > > > > > > > > > > >>>>>>>>> There
> > > > > > > > > > > >>>>>>>>>> are 2 options how a State Store
> implementation
> > > can
> > > > > > > > guarantee
> > > > > > > > > > > >>>>> that -
> > > > > > > > > > > >>>>>>>>> either
> > > > > > > > > > > >>>>>>>>>> keep uncommitted writes in memory or be able
> > to
> > > > roll
> > > > > > > back
> > > > > > > > > the
> > > > > > > > > > > >>>>>> changes
> > > > > > > > > > > >>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>> were not committed during recovery.
> RocksDB's
> > > > > > > > > > > >>>>> WriteBatchWithIndex is
> > > > > > > > > > > >>>>>>> an
> > > > > > > > > > > >>>>>>>>>> implementation of the first option. A
> > considered
> > > > > > > > > alternative,
> > > > > > > > > > > >>>>>>>>> Transactions
> > > > > > > > > > > >>>>>>>>>> via Secondary State Store for Uncommitted
> > > Changes,
> > > > > is
> > > > > > > the
> > > > > > > > > way
> > > > > > > > > > to
> > > > > > > > > > > >>>>>>>>> implement
> > > > > > > > > > > >>>>>>>>>> the second option.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> As everyone correctly pointed out, keeping
> > > > > uncommitted
> > > > > > > > data
> > > > > > > > > in
> > > > > > > > > > > >>>>>> memory
> > > > > > > > > > > >>>>>>>>>> introduces a very real risk of OOM that we
> > will
> > > > need
> > > > > > to
> > > > > > > > > > handle.
> > > > > > > > > > > >>>>> The
> > > > > > > > > > > >>>>>>>>> more I
> > > > > > > > > > > >>>>>>>>>> think about it, the more I lean towards
> going
> > > with
> > > > > the
> > > > > > > > > > > >>>>> Transactions
> > > > > > > > > > > >>>>>>> via
> > > > > > > > > > > >>>>>>>>>> Secondary Store as the way to implement
> > > > > > transactionality
> > > > > > > > as
> > > > > > > > > it
> > > > > > > > > > > >>>>> does
> > > > > > > > > > > >>>>>>> not
> > > > > > > > > > > >>>>>>>>>> have that issue.
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>> Alex
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>> On Wed, Jun 1, 2022 at 12:59 PM Guozhang
> Wang
> > <
> > > > > > > > > > > >>>>> wangg...@gmail.com>
> > > > > > > > > > > >>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Hello Alex,
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> we flush the cache, but not the underlying
> > > state
> > > > > > > store.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> You're right. The ordering I mentioned
> above
> > is
> > > > > > > actually:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> ...
> > > > > > > > > > > >>>>>>>>>>> 3. producer.sendOffsetsToTransaction();
> > > > > > > > > > > >>>>>>> producer.commitTransaction();
> > > > > > > > > > > >>>>>>>>>>> 4. flush store, make sure all writes are
> > > > persisted.
> > > > > > > > > > > >>>>>>>>>>> 5. Update the checkpoint file to 200.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> But I'm still trying to clarify how it
> > > guarantees
> > > > > > EOS,
> > > > > > > > and
> > > > > > > > > it
> > > > > > > > > > > >>>>>> seems
> > > > > > > > > > > >>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>> would achieve it by enforcing to not
> persist
> > > any
> > > > > data
> > > > > > > > > written
> > > > > > > > > > > >>>>>> within
> > > > > > > > > > > >>>>>>>>> this
> > > > > > > > > > > >>>>>>>>>>> transaction until step 4. Is that correct?
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Can you please point me to the place in
> the
> > > > > codebase
> > > > > > > > where
> > > > > > > > > > we
> > > > > > > > > > > >>>>>>>>> trigger
> > > > > > > > > > > >>>>>>>>>>> async flush before the commit?
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Oh what I meant is not what KStream code
> > does,
> > > > but
> > > > > > that
> > > > > > > > > > > >>>>> StateStore
> > > > > > > > > > > >>>>>>>>> impl
> > > > > > > > > > > >>>>>>>>>>> classes themselves could potentially flush
> > data
> > > > to
> > > > > > > become
> > > > > > > > > > > >>>>>> persisted
> > > > > > > > > > > >>>>>>>>>>> asynchronously, e.g. RocksDB does that
> > > naturally
> > > > > out
> > > > > > of
> > > > > > > > the
> > > > > > > > > > > >>>>>> control
> > > > > > > > > > > >>>>>>> of
> > > > > > > > > > > >>>>>>>>>>> KStream code. I think it is related to my
> > > > previous
> > > > > > > > > question:
> > > > > > > > > > > >>>>> if we
> > > > > > > > > > > >>>>>>>>> think
> > > > > > > > > > > >>>>>>>>>> by
> > > > > > > > > > > >>>>>>>>>>> guaranteeing EOS at the state store level,
> we
> > > > would
> > > > > > > > > > effectively
> > > > > > > > > > > >>>>>> ask
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>> impl classes that "you should not persist
> any
> > > > data
> > > > > > > until
> > > > > > > > > > > >>>>> `flush`
> > > > > > > > > > > >>>>>> is
> > > > > > > > > > > >>>>>>>>>> called
> > > > > > > > > > > >>>>>>>>>>> explicitly", is the StateStore interface
> the
> > > > right
> > > > > > > level
> > > > > > > > to
> > > > > > > > > > > >>>>>> enforce
> > > > > > > > > > > >>>>>>>>> such
> > > > > > > > > > > >>>>>>>>>>> mechanisms, or should we just do that on
> top
> > of
> > > > the
> > > > > > > > > > > >>>>> StateStores,
> > > > > > > > > > > >>>>>>> e.g.
> > > > > > > > > > > >>>>>>>>>>> during the transaction we just keep all the
> > > > writes
> > > > > in
> > > > > > > the
> > > > > > > > > > cache
> > > > > > > > > > > >>>>>> (of
> > > > > > > > > > > >>>>>>>>>> course
> > > > > > > > > > > >>>>>>>>>>> we need to consider how to work around
> memory
> > > > > > pressure
> > > > > > > as
> > > > > > > > > > > >>>>>> previously
> > > > > > > > > > > >>>>>>>>>>> mentioned), and then upon committing, we
> just
> > > > write
> > > > > > the
> > > > > > > > > > cached
> > > > > > > > > > > >>>>>>> records
> > > > > > > > > > > >>>>>>>>>> as a
> > > > > > > > > > > >>>>>>>>>>> whole into the store and then call flush.
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> Guozhang
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> On Tue, May 31, 2022 at 4:08 PM Alexander
> > > > > Sorokoumov
> > > > > > > > > > > >>>>>>>>>>> <asorokou...@confluent.io.invalid> wrote:
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Hey,
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Thank you for the wealth of great
> > suggestions
> > > > and
> > > > > > > > > questions!
> > > > > > > > > > > >>>>> I
> > > > > > > > > > > >>>>>> am
> > > > > > > > > > > >>>>>>>>> going
> > > > > > > > > > > >>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>> address the feedback in batches and update
> > the
> > > > > > > proposal
> > > > > > > > > > > >>>>> async,
> > > > > > > > > > > >>>>>> as
> > > > > > > > > > > >>>>>>>>> it is
> > > > > > > > > > > >>>>>>>>>>>> probably going to be easier for everyone.
> I
> > > will
> > > > > > also
> > > > > > > > > write
> > > > > > > > > > a
> > > > > > > > > > > >>>>>>>>> separate
> > > > > > > > > > > >>>>>>>>>>>> message after making updates to the KIP.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> @John,
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Did you consider instead just adding the
> > > option
> > > > > to
> > > > > > > the
> > > > > > > > > > > >>>>>>>>>>>>> RocksDB*StoreSupplier classes and the
> > > factories
> > > > > in
> > > > > > > > > Stores ?
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Thank you for suggesting that. I think
> that
> > > this
> > > > > > idea
> > > > > > > is
> > > > > > > > > > > >>>>> better
> > > > > > > > > > > >>>>>>> than
> > > > > > > > > > > >>>>>>>>>>> what I
> > > > > > > > > > > >>>>>>>>>>>> came up with and will update the KIP with
> > > > > > configuring
> > > > > > > > > > > >>>>>>>>> transactionality
> > > > > > > > > > > >>>>>>>>>>> via
> > > > > > > > > > > >>>>>>>>>>>> the suppliers and Stores.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> what is the advantage over just doing the
> > same
> > > > > thing
> > > > > > > > with
> > > > > > > > > > the
> > > > > > > > > > > >>>>>>>>>> RecordCache
> > > > > > > > > > > >>>>>>>>>>>>> and not introducing the WriteBatch at
> all?
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Can you point me to RecordCache? I can't
> > find
> > > it
> > > > > in
> > > > > > > the
> > > > > > > > > > > >>>>> project.
> > > > > > > > > > > >>>>>>> The
> > > > > > > > > > > >>>>>>>>>>>> advantage would be that WriteBatch
> > guarantees
> > > > > write
> > > > > > > > > > > >>>>> atomicity.
> > > > > > > > > > > >>>>>> As
> > > > > > > > > > > >>>>>>>>> far
> > > > > > > > > > > >>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>> I
> > > > > > > > > > > >>>>>>>>>>>> understood the way RecordCache works, it
> > might
> > > > > leave
> > > > > > > the
> > > > > > > > > > > >>>>> system
> > > > > > > > > > > >>>>>> in
> > > > > > > > > > > >>>>>>>>> an
> > > > > > > > > > > >>>>>>>>>>>> inconsistent state during crash failure on
> > > > write.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> You mentioned that a transactional store
> can
> > > > help
> > > > > > > reduce
> > > > > > > > > > > >>>>>>>>> duplication in
> > > > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>> case of ALOS
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> I will remove claims about ALOS from the
> > > > proposal.
> > > > > > > Thank
> > > > > > > > > you
> > > > > > > > > > > >>>>> for
> > > > > > > > > > > >>>>>>>>>>>> elaborating!
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> As a reminder, we have a new IQv2
> mechanism
> > > now.
> > > > > > > Should
> > > > > > > > we
> > > > > > > > > > > >>>>>> propose
> > > > > > > > > > > >>>>>>>>> any
> > > > > > > > > > > >>>>>>>>>>>>> changes to IQv1 to support this
> > transactional
> > > > > > > > mechanism,
> > > > > > > > > > > >>>>>> versus
> > > > > > > > > > > >>>>>>>>> just
> > > > > > > > > > > >>>>>>>>>>>>> proposing it for IQv2? Certainly, it
> seems
> > > > > strange
> > > > > > > only
> > > > > > > > > to
> > > > > > > > > > > >>>>>>>>> propose a
> > > > > > > > > > > >>>>>>>>>>>> change
> > > > > > > > > > > >>>>>>>>>>>>> for IQv1 and not v2.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>    I will update the proposal with
> > > complementary
> > > > > API
> > > > > > > > > changes
> > > > > > > > > > > >>>>> for
> > > > > > > > > > > >>>>>>> IQv2
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> What should IQ do if I request to
> > > readCommitted
> > > > > on a
> > > > > > > > > > > >>>>>>>>> non-transactional
> > > > > > > > > > > >>>>>>>>>>>>> store?
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> We can assume that non-transactional
> stores
> > > > commit
> > > > > > on
> > > > > > > > > write,
> > > > > > > > > > > >>>>> so
> > > > > > > > > > > >>>>>> IQ
> > > > > > > > > > > >>>>>>>>>> works
> > > > > > > > > > > >>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>> the same way with non-transactional stores
> > > > > > regardless
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > >>>>>> value
> > > > > > > > > > > >>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>> readCommitted.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>    @Guozhang,
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> * If we crash between line 3 and 4, then
> at
> > > that
> > > > > > time
> > > > > > > > the
> > > > > > > > > > > >>>>> local
> > > > > > > > > > > >>>>>>>>>>> persistent
> > > > > > > > > > > >>>>>>>>>>>>> store image is representing as of offset
> > 200,
> > > > but
> > > > > > > upon
> > > > > > > > > > > >>>>>> recovery
> > > > > > > > > > > >>>>>>>>> all
> > > > > > > > > > > >>>>>>>>>>>>> changelog records from 100 to
> > log-end-offset
> > > > > would
> > > > > > be
> > > > > > > > > > > >>>>>> considered
> > > > > > > > > > > >>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>> aborted
> > > > > > > > > > > >>>>>>>>>>>>> and not be replayed and we would restart
> > > > > processing
> > > > > > > > from
> > > > > > > > > > > >>>>>>> position
> > > > > > > > > > > >>>>>>>>>> 100.
> > > > > > > > > > > >>>>>>>>>>>>> Restart processing will violate EOS.I'm
> not
> > > > sure
> > > > > > how
> > > > > > > > e.g.
> > > > > > > > > > > >>>>>>>>> RocksDB's
> > > > > > > > > > > >>>>>>>>>>>>> WriteBatchWithIndex would make sure that
> > the
> > > > > step 4
> > > > > > > and
> > > > > > > > > > > >>>>> step 5
> > > > > > > > > > > >>>>>>>>> could
> > > > > > > > > > > >>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>> done atomically here.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Could you please point me to the place in
> > the
> > > > > > codebase
> > > > > > > > > where
> > > > > > > > > > > >>>>> a
> > > > > > > > > > > >>>>>>> task
> > > > > > > > > > > >>>>>>>>>>> flushes
> > > > > > > > > > > >>>>>>>>>>>> the store before committing the
> transaction?
> > > > > > > > > > > >>>>>>>>>>>> Looking at TaskExecutor (
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> > > > > > > > > > > >>>>>>>>>>>> ),
> > > > > > > > > > > >>>>>>>>>>>> StreamTask#prepareCommit (
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> > > > > > > > > > > >>>>>>>>>>>> ),
> > > > > > > > > > > >>>>>>>>>>>> and CachedStateStore (
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> > > > > > > > > > > >>>>>>>>>>>> )
> > > > > > > > > > > >>>>>>>>>>>> we flush the cache, but not the underlying
> > > state
> > > > > > > store.
> > > > > > > > > > > >>>>> Explicit
> > > > > > > > > > > >>>>>>>>>>>> StateStore#flush happens in
> > > > > > > > > > > >>>>> AbstractTask#maybeWriteCheckpoint (
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> > > > > > > > > > > >>>>>>>>>>>> ).
> > > > > > > > > > > >>>>>>>>>>>> Is there something I am missing here?
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Today all cached data that have not been
> > > flushed
> > > > > are
> > > > > > > not
> > > > > > > > > > > >>>>>> committed
> > > > > > > > > > > >>>>>>>>> for
> > > > > > > > > > > >>>>>>>>>>>>> sure, but even flushed data to the
> > persistent
> > > > > > > > underlying
> > > > > > > > > > > >>>>> store
> > > > > > > > > > > >>>>>>> may
> > > > > > > > > > > >>>>>>>>>> also
> > > > > > > > > > > >>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>> uncommitted since flushing can be
> triggered
> > > > > > > > > asynchronously
> > > > > > > > > > > >>>>>>> before
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>> commit.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Can you please point me to the place in
> the
> > > > > codebase
> > > > > > > > where
> > > > > > > > > > we
> > > > > > > > > > > >>>>>>>>> trigger
> > > > > > > > > > > >>>>>>>>>>> async
> > > > > > > > > > > >>>>>>>>>>>> flush before the commit? This would
> > certainly
> > > > be a
> > > > > > > > reason
> > > > > > > > > to
> > > > > > > > > > > >>>>>>>>> introduce
> > > > > > > > > > > >>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>>> dedicated StateStore#commit method.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Thanks again for the feedback. I am going
> to
> > > > > update
> > > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > >>>>> and
> > > > > > > > > > > >>>>>>> then
> > > > > > > > > > > >>>>>>>>>>>> respond to the next batch of questions and
> > > > > > > suggestions.
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>>> Alex
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> On Mon, May 30, 2022 at 5:13 PM Suhas
> Satish
> > > > > > > > > > > >>>>>>>>>>> <ssat...@confluent.io.invalid
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> Thanks for the KIP proposal Alex.
> > > > > > > > > > > >>>>>>>>>>>>> 1. Configuration default
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> You mention applications using streams
> DSL
> > > with
> > > > > > > > built-in
> > > > > > > > > > > >>>>>> rocksDB
> > > > > > > > > > > >>>>>>>>>> state
> > > > > > > > > > > >>>>>>>>>>>>> store will get transactional state stores
> > by
> > > > > > default
> > > > > > > > when
> > > > > > > > > > > >>>>> EOS
> > > > > > > > > > > >>>>>> is
> > > > > > > > > > > >>>>>>>>>>> enabled,
> > > > > > > > > > > >>>>>>>>>>>>> but the default implementation for apps
> > using
> > > > > PAPI
> > > > > > > will
> > > > > > > > > > > >>>>>> fallback
> > > > > > > > > > > >>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>> non-transactional behavior.
> > > > > > > > > > > >>>>>>>>>>>>> Shouldn't we have the same default
> behavior
> > > for
> > > > > > both
> > > > > > > > > types
> > > > > > > > > > > >>>>> of
> > > > > > > > > > > >>>>>>>>> apps -
> > > > > > > > > > > >>>>>>>>>>> DSL
> > > > > > > > > > > >>>>>>>>>>>>> and PAPI?
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> On Mon, May 30, 2022 at 2:11 AM Bruno
> > > Cadonna <
> > > > > > > > > > > >>>>>>> cado...@apache.org
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Thanks for the PR, Alex!
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> I am also glad to see this coming.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> 1. Configuration
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> I would also prefer to restrict the
> > > > > configuration
> > > > > > of
> > > > > > > > > > > >>>>>>>>> transactional
> > > > > > > > > > > >>>>>>>>>> on
> > > > > > > > > > > >>>>>>>>>>>>>> the state sore. Ideally, calling method
> > > > > > > > transactional()
> > > > > > > > > > > >>>>> on
> > > > > > > > > > > >>>>>> the
> > > > > > > > > > > >>>>>>>>>> state
> > > > > > > > > > > >>>>>>>>>>>>>> store would be enough. An option on the
> > > store
> > > > > > > builder
> > > > > > > > > > > >>>>> would
> > > > > > > > > > > >>>>>>>>> make it
> > > > > > > > > > > >>>>>>>>>>>>>> possible to turn transactionality on and
> > off
> > > > (as
> > > > > > > John
> > > > > > > > > > > >>>>>>> proposed).
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> 2. Memory usage in RocksDB
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> This seems to be a major issue. We do
> not
> > > have
> > > > > any
> > > > > > > > > > > >>>>> guarantee
> > > > > > > > > > > >>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>>>>>> uncommitted writes fit into memory and I
> > > guess
> > > > > we
> > > > > > > will
> > > > > > > > > > > >>>>> never
> > > > > > > > > > > >>>>>>>>> have.
> > > > > > > > > > > >>>>>>>>>>> What
> > > > > > > > > > > >>>>>>>>>>>>>> happens when the uncommitted writes do
> not
> > > fit
> > > > > > into
> > > > > > > > > > > >>>>> memory?
> > > > > > > > > > > >>>>>>> Does
> > > > > > > > > > > >>>>>>>>>>>> RocksDB
> > > > > > > > > > > >>>>>>>>>>>>>> throw an exception? Can we handle such
> an
> > > > > > exception
> > > > > > > > > > > >>>>> without
> > > > > > > > > > > >>>>>>>>>> crashing?
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Does the RocksDB behavior even need to
> be
> > > > > included
> > > > > > > in
> > > > > > > > > > > >>>>> this
> > > > > > > > > > > >>>>>>> KIP?
> > > > > > > > > > > >>>>>>>>> In
> > > > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>> end it is an implementation detail.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> What we should consider - though - is a
> > > memory
> > > > > > limit
> > > > > > > > in
> > > > > > > > > > > >>>>> some
> > > > > > > > > > > >>>>>>>>> form.
> > > > > > > > > > > >>>>>>>>>>> And
> > > > > > > > > > > >>>>>>>>>>>>>> what we do when the memory limit is
> > > exceeded.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> 3. PoC
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> I agree with Guozhang that a PoC is a
> good
> > > > idea
> > > > > to
> > > > > > > > > better
> > > > > > > > > > > >>>>>>>>>> understand
> > > > > > > > > > > >>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>> devils in the details.
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>>>>> Bruno
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>> On 25.05.22 01:52, Guozhang Wang wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>> Hello Alex,
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for writing the proposal! Glad
> to
> > > see
> > > > it
> > > > > > > > > > > >>>>> coming. I
> > > > > > > > > > > >>>>>>>>> think
> > > > > > > > > > > >>>>>>>>>>> this
> > > > > > > > > > > >>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> kind of a KIP that since too many
> devils
> > > > would
> > > > > be
> > > > > > > > > > > >>>>> buried
> > > > > > > > > > > >>>>>> in
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> details
> > > > > > > > > > > >>>>>>>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>>>>>> it's better to start working on a POC,
> > > either
> > > > > in
> > > > > > > > > > > >>>>> parallel,
> > > > > > > > > > > >>>>>>> or
> > > > > > > > > > > >>>>>>>>>>> before
> > > > > > > > > > > >>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>>> resume our discussion, rather than
> > blocking
> > > > any
> > > > > > > > > > > >>>>>>> implementation
> > > > > > > > > > > >>>>>>>>>>> until
> > > > > > > > > > > >>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>>>> satisfied with the proposal.
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Just as a concrete example, I
> personally
> > am
> > > > > still
> > > > > > > not
> > > > > > > > > > > >>>>> 100%
> > > > > > > > > > > >>>>>>>>> clear
> > > > > > > > > > > >>>>>>>>>>> how
> > > > > > > > > > > >>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> proposal would work to achieve EOS with
> > the
> > > > > state
> > > > > > > > > > > >>>>> stores.
> > > > > > > > > > > >>>>>>> For
> > > > > > > > > > > >>>>>>>>>>>> example,
> > > > > > > > > > > >>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> commit procedure today looks like this:
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> 0: there's an existing checkpoint file
> > > > > indicating
> > > > > > > the
> > > > > > > > > > > >>>>>>>>> changelog
> > > > > > > > > > > >>>>>>>>>>>> offset
> > > > > > > > > > > >>>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>> the local state store image is 100.
> Now a
> > > > > commit
> > > > > > is
> > > > > > > > > > > >>>>>>> triggered:
> > > > > > > > > > > >>>>>>>>>>>>>>> 1. flush cache (since it contains
> > partially
> > > > > > > processed
> > > > > > > > > > > >>>>>>>>> records),
> > > > > > > > > > > >>>>>>>>>>> make
> > > > > > > > > > > >>>>>>>>>>>>> sure
> > > > > > > > > > > >>>>>>>>>>>>>>> all records are written to the
> producer.
> > > > > > > > > > > >>>>>>>>>>>>>>> 2. flush producer, making sure all
> > > changelog
> > > > > > > records
> > > > > > > > > > > >>>>> have
> > > > > > > > > > > >>>>>>> now
> > > > > > > > > > > >>>>>>>>>>> acked.
> > > > > > > > > > > >>>>>>>>>>>> //
> > > > > > > > > > > >>>>>>>>>>>>>>> here we would get the new changelog
> > > position,
> > > > > say
> > > > > > > 200
> > > > > > > > > > > >>>>>>>>>>>>>>> 3. flush store, make sure all writes
> are
> > > > > > persisted.
> > > > > > > > > > > >>>>>>>>>>>>>>> 4. producer.sendOffsetsToTransaction();
> > > > > > > > > > > >>>>>>>>>>> producer.commitTransaction();
> > > > > > > > > > > >>>>>>>>>>>>> //
> > > > > > > > > > > >>>>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>>> would make the writes in changelog up
> to
> > > > offset
> > > > > > 200
> > > > > > > > > > > >>>>>>> committed
> > > > > > > > > > > >>>>>>>>>>>>>>> 5. Update the checkpoint file to 200.
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> The question about atomicity between
> > those
> > > > > lines,
> > > > > > > for
> > > > > > > > > > > >>>>>>> example:
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> * If we crash between line 4 and line
> 5,
> > > the
> > > > > > local
> > > > > > > > > > > >>>>>>> checkpoint
> > > > > > > > > > > >>>>>>>>>> file
> > > > > > > > > > > >>>>>>>>>>>>> would
> > > > > > > > > > > >>>>>>>>>>>>>>> stay as 100, and upon recovery we would
> > > > replay
> > > > > > the
> > > > > > > > > > > >>>>>> changelog
> > > > > > > > > > > >>>>>>>>> from
> > > > > > > > > > > >>>>>>>>>>> 100
> > > > > > > > > > > >>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>> 200. This is not ideal but does not
> > violate
> > > > > EOS,
> > > > > > > > since
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>>>> changelogs
> > > > > > > > > > > >>>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>>>> all overwrites anyways.
> > > > > > > > > > > >>>>>>>>>>>>>>> * If we crash between line 3 and 4,
> then
> > at
> > > > > that
> > > > > > > time
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>> local
> > > > > > > > > > > >>>>>>>>>>>>>> persistent
> > > > > > > > > > > >>>>>>>>>>>>>>> store image is representing as of
> offset
> > > 200,
> > > > > but
> > > > > > > > upon
> > > > > > > > > > > >>>>>>>>> recovery
> > > > > > > > > > > >>>>>>>>>> all
> > > > > > > > > > > >>>>>>>>>>>>>>> changelog records from 100 to
> > > log-end-offset
> > > > > > would
> > > > > > > be
> > > > > > > > > > > >>>>>>>>> considered
> > > > > > > > > > > >>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>>> aborted
> > > > > > > > > > > >>>>>>>>>>>>>>> and not be replayed and we would
> restart
> > > > > > processing
> > > > > > > > > > > >>>>> from
> > > > > > > > > > > >>>>>>>>> position
> > > > > > > > > > > >>>>>>>>>>>> 100.
> > > > > > > > > > > >>>>>>>>>>>>>>> Restart processing will violate EOS.I'm
> > not
> > > > > sure
> > > > > > > how
> > > > > > > > > > > >>>>> e.g.
> > > > > > > > > > > >>>>>>>>>> RocksDB's
> > > > > > > > > > > >>>>>>>>>>>>>>> WriteBatchWithIndex would make sure
> that
> > > the
> > > > > > step 4
> > > > > > > > and
> > > > > > > > > > > >>>>>>> step 5
> > > > > > > > > > > >>>>>>>>>>> could
> > > > > > > > > > > >>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>>> done atomically here.
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Originally what I was thinking when
> > > creating
> > > > > the
> > > > > > > JIRA
> > > > > > > > > > > >>>>>> ticket
> > > > > > > > > > > >>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>>> need to let the state store to provide
> a
> > > > > > > > transactional
> > > > > > > > > > > >>>>> API
> > > > > > > > > > > >>>>>>>>> like
> > > > > > > > > > > >>>>>>>>>>>> "token
> > > > > > > > > > > >>>>>>>>>>>>>>> commit()" used in step 4) above which
> > > > returns a
> > > > > > > > token,
> > > > > > > > > > > >>>>>> that
> > > > > > > > > > > >>>>>>>>> e.g.
> > > > > > > > > > > >>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>> our
> > > > > > > > > > > >>>>>>>>>>>>>>> example above indicates offset 200, and
> > > that
> > > > > > token
> > > > > > > > > > > >>>>> would
> > > > > > > > > > > >>>>>> be
> > > > > > > > > > > >>>>>>>>>> written
> > > > > > > > > > > >>>>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>>>> part
> > > > > > > > > > > >>>>>>>>>>>>>>> of the records in Kafka transaction in
> > step
> > > > 5).
> > > > > > And
> > > > > > > > > > > >>>>> upon
> > > > > > > > > > > >>>>>>>>> recovery
> > > > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>> state
> > > > > > > > > > > >>>>>>>>>>>>>>> store would have another API like
> > > > > > "rollback(token)"
> > > > > > > > > > > >>>>> where
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>>>> token
> > > > > > > > > > > >>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>> read
> > > > > > > > > > > >>>>>>>>>>>>>>> from the latest committed txn, and be
> > used
> > > to
> > > > > > > > rollback
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>> store
> > > > > > > > > > > >>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>>>>>>> committed image. I think your proposal
> is
> > > > > > > different,
> > > > > > > > > > > >>>>> and
> > > > > > > > > > > >>>>>> it
> > > > > > > > > > > >>>>>>>>> seems
> > > > > > > > > > > >>>>>>>>>>>> like
> > > > > > > > > > > >>>>>>>>>>>>>>> you're proposing we swap step 3) and 4)
> > > > above,
> > > > > > but
> > > > > > > > the
> > > > > > > > > > > >>>>>>>>> atomicity
> > > > > > > > > > > >>>>>>>>>>>> issue
> > > > > > > > > > > >>>>>>>>>>>>>>> still remains since now you may have
> the
> > > > store
> > > > > > > image
> > > > > > > > at
> > > > > > > > > > > >>>>>> 100
> > > > > > > > > > > >>>>>>>>> but
> > > > > > > > > > > >>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>> changelog is committed at 200. I'd like
> > to
> > > > > learn
> > > > > > > more
> > > > > > > > > > > >>>>>> about
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> details
> > > > > > > > > > > >>>>>>>>>>>>>>> on how it resolves such issues.
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Anyways, that's just an example to make
> > the
> > > > > point
> > > > > > > > that
> > > > > > > > > > > >>>>>> there
> > > > > > > > > > > >>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>> lots
> > > > > > > > > > > >>>>>>>>>>>>> of
> > > > > > > > > > > >>>>>>>>>>>>>>> implementational details which would
> > drive
> > > > the
> > > > > > > public
> > > > > > > > > > > >>>>> API
> > > > > > > > > > > >>>>>>>>> design,
> > > > > > > > > > > >>>>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>>>>>> should probably first do a POC, and
> come
> > > back
> > > > > to
> > > > > > > > > > > >>>>> discuss
> > > > > > > > > > > >>>>>> the
> > > > > > > > > > > >>>>>>>>> KIP.
> > > > > > > > > > > >>>>>>>>>>> Let
> > > > > > > > > > > >>>>>>>>>>>>> me
> > > > > > > > > > > >>>>>>>>>>>>>>> know what you think?
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> Guozhang
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 AM Sagar
> <
> > > > > > > > > > > >>>>>>>>>> sagarmeansoc...@gmail.com>
> > > > > > > > > > > >>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Hi Alexander,
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for the KIP! This seems like a
> > > great
> > > > > > > > proposal.
> > > > > > > > > > > >>>>> I
> > > > > > > > > > > >>>>>>> have
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>> same
> > > > > > > > > > > >>>>>>>>>>>>>>>> opinion as John on the Configuration
> > part
> > > > > > though.
> > > > > > > I
> > > > > > > > > > > >>>>> think
> > > > > > > > > > > >>>>>>>>> the 2
> > > > > > > > > > > >>>>>>>>>>>> level
> > > > > > > > > > > >>>>>>>>>>>>>>>> config and its behaviour based on the
> > > > > > > > > > > >>>>> setting/unsetting
> > > > > > > > > > > >>>>>> of
> > > > > > > > > > > >>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>> flag
> > > > > > > > > > > >>>>>>>>>>>>>> seems
> > > > > > > > > > > >>>>>>>>>>>>>>>> confusing to me as well. Since the KIP
> > > seems
> > > > > > > > > > > >>>>> specifically
> > > > > > > > > > > >>>>>>>>>> centred
> > > > > > > > > > > >>>>>>>>>>>>> around
> > > > > > > > > > > >>>>>>>>>>>>>>>> RocksDB it might be better to add it
> at
> > > the
> > > > > > > Supplier
> > > > > > > > > > > >>>>>> level
> > > > > > > > > > > >>>>>>> as
> > > > > > > > > > > >>>>>>>>>> John
> > > > > > > > > > > >>>>>>>>>>>>>>>> suggested.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> On similar lines, this config name =>
> > > > > > > > > > > >>>>>>>>>>>>>> *statestore.transactional.mechanism
> > > > > > > > > > > >>>>>>>>>>>>>>>> *may
> > > > > > > > > > > >>>>>>>>>>>>>>>> also need rethinking as the value
> > assigned
> > > > to
> > > > > > > > > > > >>>>>>>>>>> it(rocksdb_indexbatch)
> > > > > > > > > > > >>>>>>>>>>>>>>>> implicitly seems to assume that
> rocksdb
> > is
> > > > the
> > > > > > > only
> > > > > > > > > > > >>>>>>>>> statestore
> > > > > > > > > > > >>>>>>>>>>> that
> > > > > > > > > > > >>>>>>>>>>>>>> Kafka
> > > > > > > > > > > >>>>>>>>>>>>>>>> Stream supports while that's not the
> > case.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Also, regarding the potential memory
> > > > pressure
> > > > > > that
> > > > > > > > > > > >>>>> can be
> > > > > > > > > > > >>>>>>>>>>> introduced
> > > > > > > > > > > >>>>>>>>>>>>> by
> > > > > > > > > > > >>>>>>>>>>>>>>>> WriteBatchIndex, do you think it might
> > > make
> > > > > more
> > > > > > > > > > > >>>>> sense to
> > > > > > > > > > > >>>>>>>>>> include
> > > > > > > > > > > >>>>>>>>>>>> some
> > > > > > > > > > > >>>>>>>>>>>>>>>> numbers/benchmarks on how much the
> > memory
> > > > > > > > consumption
> > > > > > > > > > > >>>>>> might
> > > > > > > > > > > >>>>>>>>>>>> increase?
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Lastly, the read_uncommitted flag's
> > > > behaviour
> > > > > on
> > > > > > > IQ
> > > > > > > > > > > >>>>> may
> > > > > > > > > > > >>>>>>> need
> > > > > > > > > > > >>>>>>>>>> more
> > > > > > > > > > > >>>>>>>>>>>>>>>> elaboration.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> These points aside, as I said, this
> is a
> > > > great
> > > > > > > > > > > >>>>> proposal!
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks!
> > > > > > > > > > > >>>>>>>>>>>>>>>> Sagar.
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>> On Tue, May 24, 2022 at 10:35 PM John
> > > > Roesler
> > > > > <
> > > > > > > > > > > >>>>>>>>>>> vvcep...@apache.org>
> > > > > > > > > > > >>>>>>>>>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks for the KIP, Alex!
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> I'm really happy to see your
> proposal.
> > > This
> > > > > > > > > > > >>>>> improvement
> > > > > > > > > > > >>>>>>>>> fills a
> > > > > > > > > > > >>>>>>>>>>>>>>>>> long-standing gap.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> I have a few questions:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> 1. Configuration
> > > > > > > > > > > >>>>>>>>>>>>>>>>> The KIP only mentions RocksDB, but of
> > > > course,
> > > > > > > > Streams
> > > > > > > > > > > >>>>>> also
> > > > > > > > > > > >>>>>>>>>> ships
> > > > > > > > > > > >>>>>>>>>>>> with
> > > > > > > > > > > >>>>>>>>>>>>>> an
> > > > > > > > > > > >>>>>>>>>>>>>>>>> InMemory store, and users also plug
> in
> > > > their
> > > > > > own
> > > > > > > > > > > >>>>> custom
> > > > > > > > > > > >>>>>>>>> state
> > > > > > > > > > > >>>>>>>>>>>> stores.
> > > > > > > > > > > >>>>>>>>>>>>>> It
> > > > > > > > > > > >>>>>>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>>>>> also common to use multiple types of
> > > state
> > > > > > stores
> > > > > > > > in
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>> same
> > > > > > > > > > > >>>>>>>>>>>>>> application
> > > > > > > > > > > >>>>>>>>>>>>>>>>> for different purposes.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Against this backdrop, the choice to
> > > > > configure
> > > > > > > > > > > >>>>>>>>> transactionality
> > > > > > > > > > > >>>>>>>>>>> as
> > > > > > > > > > > >>>>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>>>>>>>> top-level config, as well as to
> > configure
> > > > the
> > > > > > > store
> > > > > > > > > > > >>>>>>>>> transaction
> > > > > > > > > > > >>>>>>>>>>>>>> mechanism
> > > > > > > > > > > >>>>>>>>>>>>>>>>> as a top-level config, seems a bit
> off.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Did you consider instead just adding
> > the
> > > > > option
> > > > > > > to
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>> RocksDB*StoreSupplier classes and the
> > > > > factories
> > > > > > > in
> > > > > > > > > > > >>>>>> Stores
> > > > > > > > > > > >>>>>>> ?
> > > > > > > > > > > >>>>>>>>> It
> > > > > > > > > > > >>>>>>>>>>>> seems
> > > > > > > > > > > >>>>>>>>>>>>>> like
> > > > > > > > > > > >>>>>>>>>>>>>>>>> the desire to enable the feature by
> > > > default,
> > > > > > but
> > > > > > > > > > > >>>>> with a
> > > > > > > > > > > >>>>>>>>>>>> feature-flag
> > > > > > > > > > > >>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>>> disable it was a factor here.
> However,
> > as
> > > > you
> > > > > > > > pointed
> > > > > > > > > > > >>>>>> out,
> > > > > > > > > > > >>>>>>>>>> there
> > > > > > > > > > > >>>>>>>>>>>> are
> > > > > > > > > > > >>>>>>>>>>>>>> some
> > > > > > > > > > > >>>>>>>>>>>>>>>>> major considerations that users
> should
> > be
> > > > > aware
> > > > > > > of,
> > > > > > > > > > > >>>>> so
> > > > > > > > > > > >>>>>>>>> opt-in
> > > > > > > > > > > >>>>>>>>>>>> doesn't
> > > > > > > > > > > >>>>>>>>>>>>>>>> seem
> > > > > > > > > > > >>>>>>>>>>>>>>>>> like a bad choice, either. You could
> > add
> > > an
> > > > > > Enum
> > > > > > > > > > > >>>>>> argument
> > > > > > > > > > > >>>>>>> to
> > > > > > > > > > > >>>>>>>>>>> those
> > > > > > > > > > > >>>>>>>>>>>>>>>>> factories like
> > > > > > > > `RocksDBTransactionalMechanism.{NONE,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Some points in favor of this
> approach:
> > > > > > > > > > > >>>>>>>>>>>>>>>>> * Avoid "stores that don't support
> > > > > transactions
> > > > > > > > > > > >>>>> ignore
> > > > > > > > > > > >>>>>> the
> > > > > > > > > > > >>>>>>>>>>> config"
> > > > > > > > > > > >>>>>>>>>>>>>>>>> complexity
> > > > > > > > > > > >>>>>>>>>>>>>>>>> * Users can choose how to spend their
> > > > memory
> > > > > > > > budget,
> > > > > > > > > > > >>>>>>> making
> > > > > > > > > > > >>>>>>>>>> some
> > > > > > > > > > > >>>>>>>>>>>>> stores
> > > > > > > > > > > >>>>>>>>>>>>>>>>> transactional and others not
> > > > > > > > > > > >>>>>>>>>>>>>>>>> * When we add transactional support
> to
> > > > > > in-memory
> > > > > > > > > > > >>>>> stores,
> > > > > > > > > > > >>>>>>> we
> > > > > > > > > > > >>>>>>>>>> don't
> > > > > > > > > > > >>>>>>>>>>>>> have
> > > > > > > > > > > >>>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>>> figure out what to do with the
> > mechanism
> > > > > config
> > > > > > > > > > > >>>>> (i.e.,
> > > > > > > > > > > >>>>>>> what
> > > > > > > > > > > >>>>>>>>> do
> > > > > > > > > > > >>>>>>>>>>> you
> > > > > > > > > > > >>>>>>>>>>>>> set
> > > > > > > > > > > >>>>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>> mechanism to when there are multiple
> > > kinds
> > > > of
> > > > > > > > > > > >>>>>>> transactional
> > > > > > > > > > > >>>>>>>>>>> stores
> > > > > > > > > > > >>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>> topology?)
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> 2. caching/flushing/transactions
> > > > > > > > > > > >>>>>>>>>>>>>>>>> The coupling between memory usage and
> > > > > flushing
> > > > > > > that
> > > > > > > > > > > >>>>> you
> > > > > > > > > > > >>>>>>>>>> mentioned
> > > > > > > > > > > >>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>>>>>>> bit
> > > > > > > > > > > >>>>>>>>>>>>>>>>> troubling. It also occurs to me that
> > > there
> > > > > > seems
> > > > > > > to
> > > > > > > > > > > >>>>> be
> > > > > > > > > > > >>>>>>> some
> > > > > > > > > > > >>>>>>>>>>>>>> relationship
> > > > > > > > > > > >>>>>>>>>>>>>>>>> with the existing record cache, which
> > is
> > > > also
> > > > > > an
> > > > > > > > > > > >>>>>> in-memory
> > > > > > > > > > > >>>>>>>>>>> holding
> > > > > > > > > > > >>>>>>>>>>>>> area
> > > > > > > > > > > >>>>>>>>>>>>>>>> for
> > > > > > > > > > > >>>>>>>>>>>>>>>>> records that are not yet written to
> the
> > > > cache
> > > > > > > > and/or
> > > > > > > > > > > >>>>>> store
> > > > > > > > > > > >>>>>>>>>>> (albeit
> > > > > > > > > > > >>>>>>>>>>>>> with
> > > > > > > > > > > >>>>>>>>>>>>>>>> no
> > > > > > > > > > > >>>>>>>>>>>>>>>>> particular semantics). Have you
> > > considered
> > > > > how
> > > > > > > all
> > > > > > > > > > > >>>>> these
> > > > > > > > > > > >>>>>>>>>>> components
> > > > > > > > > > > >>>>>>>>>>>>>>>> should
> > > > > > > > > > > >>>>>>>>>>>>>>>>> relate? For example, should a "full"
> > > > > WriteBatch
> > > > > > > > > > > >>>>> actually
> > > > > > > > > > > >>>>>>>>>> trigger
> > > > > > > > > > > >>>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>>>>> flush
> > > > > > > > > > > >>>>>>>>>>>>>>>> so
> > > > > > > > > > > >>>>>>>>>>>>>>>>> that we don't get OOMEs? If the
> > proposed
> > > > > > > > > > > >>>>> transactional
> > > > > > > > > > > >>>>>>>>>> mechanism
> > > > > > > > > > > >>>>>>>>>>>>> forces
> > > > > > > > > > > >>>>>>>>>>>>>>>> all
> > > > > > > > > > > >>>>>>>>>>>>>>>>> uncommitted writes to be buffered in
> > > > memory,
> > > > > > > until
> > > > > > > > a
> > > > > > > > > > > >>>>>>> commit,
> > > > > > > > > > > >>>>>>>>>> then
> > > > > > > > > > > >>>>>>>>>>>>> what
> > > > > > > > > > > >>>>>>>>>>>>>> is
> > > > > > > > > > > >>>>>>>>>>>>>>>>> the advantage over just doing the
> same
> > > > thing
> > > > > > with
> > > > > > > > the
> > > > > > > > > > > >>>>>>>>>> RecordCache
> > > > > > > > > > > >>>>>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>>>>> not
> > > > > > > > > > > >>>>>>>>>>>>>>>>> introducing the WriteBatch at all?
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> 3. ALOS
> > > > > > > > > > > >>>>>>>>>>>>>>>>> You mentioned that a transactional
> > store
> > > > can
> > > > > > help
> > > > > > > > > > > >>>>> reduce
> > > > > > > > > > > >>>>>>>>>>>> duplication
> > > > > > > > > > > >>>>>>>>>>>>> in
> > > > > > > > > > > >>>>>>>>>>>>>>>>> the case of ALOS. We might want to be
> > > > careful
> > > > > > > about
> > > > > > > > > > > >>>>>> claims
> > > > > > > > > > > >>>>>>>>> like
> > > > > > > > > > > >>>>>>>>>>>> that.
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Duplication isn't the way that
> repeated
> > > > > > > processing
> > > > > > > > > > > >>>>>>>>> manifests in
> > > > > > > > > > > >>>>>>>>>>>> state
> > > > > > > > > > > >>>>>>>>>>>>>>>>> stores. Rather, it is in the form of
> > > dirty
> > > > > > reads
> > > > > > > > > > > >>>>> during
> > > > > > > > > > > >>>>>>>>>>>> reprocessing.
> > > > > > > > > > > >>>>>>>>>>>>>>>> This
> > > > > > > > > > > >>>>>>>>>>>>>>>>> feature may reduce the incidence of
> > dirty
> > > > > reads
> > > > > > > > > > > >>>>> during
> > > > > > > > > > > >>>>>>>>>>>> reprocessing,
> > > > > > > > > > > >>>>>>>>>>>>>> but
> > > > > > > > > > > >>>>>>>>>>>>>>>>> not in a predictable way. During
> > regular
> > > > > > > processing
> > > > > > > > > > > >>>>>> today,
> > > > > > > > > > > >>>>>>>>> we
> > > > > > > > > > > >>>>>>>>>>> will
> > > > > > > > > > > >>>>>>>>>>>>> send
> > > > > > > > > > > >>>>>>>>>>>>>>>>> some records through to the changelog
> > in
> > > > > > between
> > > > > > > > > > > >>>>> commit
> > > > > > > > > > > >>>>>>>>>>> intervals.
> > > > > > > > > > > >>>>>>>>>>>>>> Under
> > > > > > > > > > > >>>>>>>>>>>>>>>>> ALOS, if any of those dirty writes
> gets
> > > > > > committed
> > > > > > > > to
> > > > > > > > > > > >>>>> the
> > > > > > > > > > > >>>>>>>>>>> changelog
> > > > > > > > > > > >>>>>>>>>>>>>> topic,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> then upon failure, we have to roll
> the
> > > > store
> > > > > > > > forward
> > > > > > > > > > > >>>>> to
> > > > > > > > > > > >>>>>>> them
> > > > > > > > > > > >>>>>>>>>>>> anyway,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> regardless of this new transactional
> > > > > mechanism.
> > > > > > > > > > > >>>>> That's a
> > > > > > > > > > > >>>>>>>>>> fixable
> > > > > > > > > > > >>>>>>>>>>>>>> problem,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> by the way, but this KIP doesn't seem
> > to
> > > > fix
> > > > > > it.
> > > > > > > I
> > > > > > > > > > > >>>>>> wonder
> > > > > > > > > > > >>>>>>>>> if we
> > > > > > > > > > > >>>>>>>>>>>>> should
> > > > > > > > > > > >>>>>>>>>>>>>>>> make
> > > > > > > > > > > >>>>>>>>>>>>>>>>> any claims about the relationship of
> > this
> > > > > > feature
> > > > > > > > to
> > > > > > > > > > > >>>>>> ALOS
> > > > > > > > > > > >>>>>>> if
> > > > > > > > > > > >>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>> real-world
> > > > > > > > > > > >>>>>>>>>>>>>>>>> behavior is so complex.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> 4. IQ
> > > > > > > > > > > >>>>>>>>>>>>>>>>> As a reminder, we have a new IQv2
> > > mechanism
> > > > > > now.
> > > > > > > > > > > >>>>> Should
> > > > > > > > > > > >>>>>> we
> > > > > > > > > > > >>>>>>>>>>> propose
> > > > > > > > > > > >>>>>>>>>>>>> any
> > > > > > > > > > > >>>>>>>>>>>>>>>>> changes to IQv1 to support this
> > > > transactional
> > > > > > > > > > > >>>>> mechanism,
> > > > > > > > > > > >>>>>>>>> versus
> > > > > > > > > > > >>>>>>>>>>>> just
> > > > > > > > > > > >>>>>>>>>>>>>>>>> proposing it for IQv2? Certainly, it
> > > seems
> > > > > > > strange
> > > > > > > > > > > >>>>> only
> > > > > > > > > > > >>>>>> to
> > > > > > > > > > > >>>>>>>>>>> propose
> > > > > > > > > > > >>>>>>>>>>>> a
> > > > > > > > > > > >>>>>>>>>>>>>>>> change
> > > > > > > > > > > >>>>>>>>>>>>>>>>> for IQv1 and not v2.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Regarding your proposal for IQv1, I'm
> > > > unsure
> > > > > > what
> > > > > > > > the
> > > > > > > > > > > >>>>>>>>> behavior
> > > > > > > > > > > >>>>>>>>>>>> should
> > > > > > > > > > > >>>>>>>>>>>>>> be
> > > > > > > > > > > >>>>>>>>>>>>>>>>> for readCommitted, since the current
> > > > behavior
> > > > > > > also
> > > > > > > > > > > >>>>> reads
> > > > > > > > > > > >>>>>>>>> out of
> > > > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > > > >>>>>>>>>>>>>>>>> RecordCache. I guess if
> > > > readCommitted==false,
> > > > > > > then
> > > > > > > > we
> > > > > > > > > > > >>>>>> will
> > > > > > > > > > > >>>>>>>>>>> continue
> > > > > > > > > > > >>>>>>>>>>>>> to
> > > > > > > > > > > >>>>>>>>>>>>>>>> read
> > > > > > > > > > > >>>>>>>>>>>>>>>>> from the cache first, then the Batch,
> > > then
> > > > > the
> > > > > > > > store;
> > > > > > > > > > > >>>>>> and
> > > > > > > > > > > >>>>>>> if
> > > > > > > > > > > >>>>>>>>>>>>>>>>> readCommitted==true, we would skip
> the
> > > > cache
> > > > > > and
> > > > > > > > the
> > > > > > > > > > > >>>>>> Batch
> > > > > > > > > > > >>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>> only
> > > > > > > > > > > >>>>>>>>>>>>>> read
> > > > > > > > > > > >>>>>>>>>>>>>>>>> from the persistent RocksDB store?
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> What should IQ do if I request to
> > > > > readCommitted
> > > > > > > on
> > > > > > > > a
> > > > > > > > > > > >>>>>>>>>>>>> non-transactional
> > > > > > > > > > > >>>>>>>>>>>>>>>>> store?
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks again for proposing the KIP,
> and
> > > my
> > > > > > > > apologies
> > > > > > > > > > > >>>>> for
> > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > >>>>>>>>>> long
> > > > > > > > > > > >>>>>>>>>>>>>> reply;
> > > > > > > > > > > >>>>>>>>>>>>>>>>> I'm hoping to air all my concerns in
> > one
> > > > > > "batch"
> > > > > > > to
> > > > > > > > > > > >>>>> save
> > > > > > > > > > > >>>>>>>>> time
> > > > > > > > > > > >>>>>>>>>> for
> > > > > > > > > > > >>>>>>>>>>>>> you.
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks,
> > > > > > > > > > > >>>>>>>>>>>>>>>>> -John
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>> On Tue, May 24, 2022, at 03:45,
> > Alexander
> > > > > > > > Sorokoumov
> > > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Hi all,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> I've written a KIP for making Kafka
> > > > Streams
> > > > > > > state
> > > > > > > > > > > >>>>>> stores
> > > > > > > > > > > >>>>>>>>>>>>> transactional
> > > > > > > > > > > >>>>>>>>>>>>>>>>> and
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> would like to start a discussion:
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Best,
> > > > > > > > > > > >>>>>>>>>>>>>>>>>> Alex
> > > > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> --
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> [image: Confluent] <
> > https://www.confluent.io
> > > >
> > > > > > > > > > > >>>>>>>>>>>>> Suhas Satish
> > > > > > > > > > > >>>>>>>>>>>>> Engineering Manager
> > > > > > > > > > > >>>>>>>>>>>>> Follow us: [image: Blog]
> > > > > > > > > > > >>>>>>>>>>>>> <
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> > > > > > > > > > > >>>>>>>>>>>>>> [image:
> > > > > > > > > > > >>>>>>>>>>>>> Twitter] <
> https://twitter.com/ConfluentInc
> > > > > >[image:
> > > > > > > > > > > >>>>> LinkedIn]
> > > > > > > > > > > >>>>>>>>>>>>> <
> > https://www.linkedin.com/company/confluent/
> > > >
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>> [image: Try Confluent Cloud for Free]
> > > > > > > > > > > >>>>>>>>>>>>> <
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> > > > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>> --
> > > > > > > > > > > >>>>>>>>>>> -- Guozhang
> > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>> --
> > > > > > > > > > > >>>>>>>>> -- Guozhang
> > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > >>>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>> --
> > > > > > > > > > > >>>>>> -- Guozhang
> > > > > > > > > > > >>>>>>
> > > > > > > > > > > >>>>>
> > > > > > > > > > > >>>>
> > > > > > > > > > > >>>
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to